Merge feat/60-incidents-db: split incidents tables to dedicated DB (#60)
This commit is contained in:
commit
1ebe216f4e
6 changed files with 140 additions and 34 deletions
|
|
@ -46,6 +46,8 @@ CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_
|
||||||
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
|
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
|
||||||
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
|
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
|
||||||
|
|
||||||
|
-- incidents tables moved to ensure_incidents_schema() / INCIDENTS_DB_PATH
|
||||||
|
-- kept here as no-ops so legacy single-file deployments still work
|
||||||
CREATE TABLE IF NOT EXISTS incidents (
|
CREATE TABLE IF NOT EXISTS incidents (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
label TEXT NOT NULL,
|
label TEXT NOT NULL,
|
||||||
|
|
@ -56,8 +58,6 @@ CREATE TABLE IF NOT EXISTS incidents (
|
||||||
created_at TEXT NOT NULL,
|
created_at TEXT NOT NULL,
|
||||||
severity TEXT NOT NULL DEFAULT 'medium'
|
severity TEXT NOT NULL DEFAULT 'medium'
|
||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS received_bundles (
|
CREATE TABLE IF NOT EXISTS received_bundles (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
source_host TEXT NOT NULL,
|
source_host TEXT NOT NULL,
|
||||||
|
|
@ -69,9 +69,6 @@ CREATE TABLE IF NOT EXISTS received_bundles (
|
||||||
entry_count INTEGER NOT NULL DEFAULT 0,
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
||||||
bundle_json TEXT NOT NULL
|
bundle_json TEXT NOT NULL
|
||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS sent_bundles (
|
CREATE TABLE IF NOT EXISTS sent_bundles (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
incident_id TEXT NOT NULL,
|
incident_id TEXT NOT NULL,
|
||||||
|
|
@ -80,8 +77,6 @@ CREATE TABLE IF NOT EXISTS sent_bundles (
|
||||||
entry_count INTEGER NOT NULL DEFAULT 0,
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
||||||
bundle_json TEXT NOT NULL
|
bundle_json TEXT NOT NULL
|
||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
|
|
||||||
|
|
||||||
-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH
|
-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH
|
||||||
-- kept here as no-ops so legacy single-file deployments still work
|
-- kept here as no-ops so legacy single-file deployments still work
|
||||||
|
|
@ -206,6 +201,97 @@ def ensure_context_schema(db_path: Path) -> None:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
_INCIDENTS_SCHEMA = """
|
||||||
|
CREATE TABLE IF NOT EXISTS incidents (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
label TEXT NOT NULL,
|
||||||
|
issue_type TEXT NOT NULL DEFAULT '',
|
||||||
|
started_at TEXT,
|
||||||
|
ended_at TEXT,
|
||||||
|
notes TEXT NOT NULL DEFAULT '',
|
||||||
|
created_at TEXT NOT NULL,
|
||||||
|
severity TEXT NOT NULL DEFAULT 'medium'
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS received_bundles (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
source_host TEXT NOT NULL,
|
||||||
|
issue_type TEXT NOT NULL DEFAULT '',
|
||||||
|
label TEXT NOT NULL,
|
||||||
|
severity TEXT NOT NULL DEFAULT 'medium',
|
||||||
|
started_at TEXT,
|
||||||
|
bundled_at TEXT NOT NULL,
|
||||||
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
||||||
|
bundle_json TEXT NOT NULL
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS sent_bundles (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
incident_id TEXT NOT NULL,
|
||||||
|
exported_at TEXT NOT NULL,
|
||||||
|
sanitized INTEGER NOT NULL DEFAULT 0,
|
||||||
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
||||||
|
bundle_json TEXT NOT NULL
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_incidents_schema(db_path: Path) -> None:
|
||||||
|
"""Create incidents tables in a dedicated database file.
|
||||||
|
|
||||||
|
Using a separate file from the main log DB means incident writes never
|
||||||
|
contend with the FTS5 bulk-insert write lock held by the glean scheduler.
|
||||||
|
Mirrors the context_facts split (CONTEXT_DB_PATH / turnstone-context.db).
|
||||||
|
"""
|
||||||
|
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
|
conn.executescript(_INCIDENTS_SCHEMA)
|
||||||
|
for stmt in [
|
||||||
|
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
conn.execute(stmt)
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
pass
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_incidents_to_dedicated_db(main_db: Path, incidents_db: Path) -> int:
|
||||||
|
"""One-shot migration: copy incidents/bundles rows from main DB to incidents DB.
|
||||||
|
|
||||||
|
Safe to call on every startup — rows already present in incidents_db are
|
||||||
|
skipped via INSERT OR IGNORE. Returns the count of rows migrated.
|
||||||
|
"""
|
||||||
|
src = sqlite3.connect(str(main_db), timeout=30.0)
|
||||||
|
src.row_factory = sqlite3.Row
|
||||||
|
dst = sqlite3.connect(str(incidents_db), timeout=30.0)
|
||||||
|
migrated = 0
|
||||||
|
for table in ("incidents", "received_bundles", "sent_bundles"):
|
||||||
|
try:
|
||||||
|
rows = src.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
continue
|
||||||
|
if not rows:
|
||||||
|
continue
|
||||||
|
cols = ", ".join(rows[0].keys())
|
||||||
|
placeholders = ", ".join("?" * len(rows[0].keys()))
|
||||||
|
dst.executemany(
|
||||||
|
f"INSERT OR IGNORE INTO {table} ({cols}) VALUES ({placeholders})", # noqa: S608
|
||||||
|
[tuple(r) for r in rows],
|
||||||
|
)
|
||||||
|
migrated += len(rows)
|
||||||
|
dst.commit()
|
||||||
|
src.close()
|
||||||
|
dst.close()
|
||||||
|
return migrated
|
||||||
|
|
||||||
|
|
||||||
def _fingerprint(path: Path) -> tuple[float, int]:
|
def _fingerprint(path: Path) -> tuple[float, int]:
|
||||||
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
|
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
|
||||||
st = path.stat()
|
st = path.stat()
|
||||||
|
|
|
||||||
44
app/rest.py
44
app/rest.py
|
|
@ -35,7 +35,7 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from app.glean.pipeline import ensure_schema, ensure_context_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
|
from app.glean.pipeline import ensure_schema, ensure_context_schema, ensure_incidents_schema, migrate_incidents_to_dedicated_db, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
|
||||||
from app.glean.base import load_compiled_patterns, now_iso
|
from app.glean.base import load_compiled_patterns, now_iso
|
||||||
from app.glean.tautulli import parse_webhook as _parse_tautulli
|
from app.glean.tautulli import parse_webhook as _parse_tautulli
|
||||||
from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh
|
from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh
|
||||||
|
|
@ -95,6 +95,11 @@ DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "da
|
||||||
CONTEXT_DB_PATH = Path(
|
CONTEXT_DB_PATH = Path(
|
||||||
os.environ.get("TURNSTONE_CONTEXT_DB", DB_PATH.parent / "turnstone-context.db")
|
os.environ.get("TURNSTONE_CONTEXT_DB", DB_PATH.parent / "turnstone-context.db")
|
||||||
)
|
)
|
||||||
|
# Incidents get their own file so incident writes never block behind the FTS5
|
||||||
|
# bulk-insert write lock held by the glean scheduler during log bursts.
|
||||||
|
INCIDENTS_DB_PATH = Path(
|
||||||
|
os.environ.get("TURNSTONE_INCIDENTS_DB", DB_PATH.parent / "turnstone-incidents.db")
|
||||||
|
)
|
||||||
PREFS_PATH = DB_PATH.parent / "preferences.json"
|
PREFS_PATH = DB_PATH.parent / "preferences.json"
|
||||||
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
|
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
|
||||||
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
|
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
|
||||||
|
|
@ -135,6 +140,12 @@ async def _lifespan(app: FastAPI):
|
||||||
_audit_log.addHandler(h)
|
_audit_log.addHandler(h)
|
||||||
ensure_schema(DB_PATH)
|
ensure_schema(DB_PATH)
|
||||||
ensure_context_schema(CONTEXT_DB_PATH)
|
ensure_context_schema(CONTEXT_DB_PATH)
|
||||||
|
ensure_incidents_schema(INCIDENTS_DB_PATH)
|
||||||
|
migrated = migrate_incidents_to_dedicated_db(DB_PATH, INCIDENTS_DB_PATH)
|
||||||
|
if migrated:
|
||||||
|
logging.getLogger(__name__).info(
|
||||||
|
"Migrated %d incident/bundle rows from main DB to incidents DB", migrated
|
||||||
|
)
|
||||||
_compiled_patterns = load_compiled_patterns(PATTERN_FILE)
|
_compiled_patterns = load_compiled_patterns(PATTERN_FILE)
|
||||||
watch_cfg_path = PATTERN_DIR / "watch.yaml"
|
watch_cfg_path = PATTERN_DIR / "watch.yaml"
|
||||||
configs = load_watch_config(watch_cfg_path)
|
configs = load_watch_config(watch_cfg_path)
|
||||||
|
|
@ -451,6 +462,7 @@ async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse:
|
||||||
llm_model=prefs.get("llm_model") or None,
|
llm_model=prefs.get("llm_model") or None,
|
||||||
llm_api_key=prefs.get("llm_api_key") or None,
|
llm_api_key=prefs.get("llm_api_key") or None,
|
||||||
context_db_path=CONTEXT_DB_PATH,
|
context_db_path=CONTEXT_DB_PATH,
|
||||||
|
incidents_db_path=INCIDENTS_DB_PATH,
|
||||||
tech_level=prefs.get("tech_level", "sysadmin"),
|
tech_level=prefs.get("tech_level", "sysadmin"),
|
||||||
):
|
):
|
||||||
yield f"data: {json.dumps(event)}\n\n"
|
yield f"data: {json.dumps(event)}\n\n"
|
||||||
|
|
@ -924,7 +936,7 @@ def get_stats(
|
||||||
@router.post("/api/incidents")
|
@router.post("/api/incidents")
|
||||||
def create_incident_endpoint(body: IncidentCreate) -> dict:
|
def create_incident_endpoint(body: IncidentCreate) -> dict:
|
||||||
incident = create_incident(
|
incident = create_incident(
|
||||||
DB_PATH,
|
INCIDENTS_DB_PATH,
|
||||||
label=body.label,
|
label=body.label,
|
||||||
issue_type=body.issue_type,
|
issue_type=body.issue_type,
|
||||||
started_at=body.started_at,
|
started_at=body.started_at,
|
||||||
|
|
@ -937,15 +949,15 @@ def create_incident_endpoint(body: IncidentCreate) -> dict:
|
||||||
|
|
||||||
@router.get("/api/incidents")
|
@router.get("/api/incidents")
|
||||||
def list_incidents_endpoint() -> dict:
|
def list_incidents_endpoint() -> dict:
|
||||||
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]}
|
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(INCIDENTS_DB_PATH)]}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/api/incidents/{incident_id}")
|
@router.get("/api/incidents/{incident_id}")
|
||||||
def get_incident_endpoint(incident_id: str) -> dict:
|
def get_incident_endpoint(incident_id: str) -> dict:
|
||||||
incident = get_incident(DB_PATH, incident_id)
|
incident = get_incident(INCIDENTS_DB_PATH, incident_id)
|
||||||
if not incident:
|
if not incident:
|
||||||
raise HTTPException(status_code=404, detail="Incident not found")
|
raise HTTPException(status_code=404, detail="Incident not found")
|
||||||
entries = get_incident_entries(DB_PATH, incident)
|
entries = get_incident_entries(INCIDENTS_DB_PATH, incident)
|
||||||
return {
|
return {
|
||||||
**dataclasses.asdict(incident),
|
**dataclasses.asdict(incident),
|
||||||
"entries": [dataclasses.asdict(e) for e in entries],
|
"entries": [dataclasses.asdict(e) for e in entries],
|
||||||
|
|
@ -954,24 +966,24 @@ def get_incident_endpoint(incident_id: str) -> dict:
|
||||||
|
|
||||||
@router.delete("/api/incidents/{incident_id}")
|
@router.delete("/api/incidents/{incident_id}")
|
||||||
def delete_incident_endpoint(incident_id: str) -> dict:
|
def delete_incident_endpoint(incident_id: str) -> dict:
|
||||||
if not delete_incident(DB_PATH, incident_id):
|
if not delete_incident(INCIDENTS_DB_PATH, incident_id):
|
||||||
raise HTTPException(status_code=404, detail="Incident not found")
|
raise HTTPException(status_code=404, detail="Incident not found")
|
||||||
return {"deleted": incident_id}
|
return {"deleted": incident_id}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/api/incidents/{incident_id}/bundle")
|
@router.get("/api/incidents/{incident_id}/bundle")
|
||||||
def get_incident_bundle(incident_id: str, sanitize: bool = False) -> dict:
|
def get_incident_bundle(incident_id: str, sanitize: bool = False) -> dict:
|
||||||
incident = get_incident(DB_PATH, incident_id)
|
incident = get_incident(INCIDENTS_DB_PATH, incident_id)
|
||||||
if not incident:
|
if not incident:
|
||||||
raise HTTPException(status_code=404, detail="Incident not found")
|
raise HTTPException(status_code=404, detail="Incident not found")
|
||||||
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize)
|
bundle = build_bundle(INCIDENTS_DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize)
|
||||||
record_sent_bundle(DB_PATH, incident_id, bundle, sanitized=sanitize)
|
record_sent_bundle(INCIDENTS_DB_PATH, incident_id, bundle, sanitized=sanitize)
|
||||||
return bundle
|
return bundle
|
||||||
|
|
||||||
|
|
||||||
@router.get("/api/sent-bundles")
|
@router.get("/api/sent-bundles")
|
||||||
def list_sent_bundles_endpoint() -> dict:
|
def list_sent_bundles_endpoint() -> dict:
|
||||||
bundles = list_sent_bundles(DB_PATH)
|
bundles = list_sent_bundles(INCIDENTS_DB_PATH)
|
||||||
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
|
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -979,11 +991,11 @@ def list_sent_bundles_endpoint() -> dict:
|
||||||
def send_incident_bundle(incident_id: str, sanitize: bool = False) -> dict:
|
def send_incident_bundle(incident_id: str, sanitize: bool = False) -> dict:
|
||||||
if not BUNDLE_ENDPOINT:
|
if not BUNDLE_ENDPOINT:
|
||||||
raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
|
raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
|
||||||
incident = get_incident(DB_PATH, incident_id)
|
incident = get_incident(INCIDENTS_DB_PATH, incident_id)
|
||||||
if not incident:
|
if not incident:
|
||||||
raise HTTPException(status_code=404, detail="Incident not found")
|
raise HTTPException(status_code=404, detail="Incident not found")
|
||||||
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize)
|
bundle = build_bundle(INCIDENTS_DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize)
|
||||||
record_sent_bundle(DB_PATH, incident_id, bundle, sanitized=sanitize)
|
record_sent_bundle(INCIDENTS_DB_PATH, incident_id, bundle, sanitized=sanitize)
|
||||||
payload = json.dumps(bundle).encode()
|
payload = json.dumps(bundle).encode()
|
||||||
req = urllib.request.Request(
|
req = urllib.request.Request(
|
||||||
BUNDLE_ENDPOINT,
|
BUNDLE_ENDPOINT,
|
||||||
|
|
@ -1002,19 +1014,19 @@ def send_incident_bundle(incident_id: str, sanitize: bool = False) -> dict:
|
||||||
|
|
||||||
@router.post("/api/bundles")
|
@router.post("/api/bundles")
|
||||||
def receive_bundle(bundle: dict) -> dict:
|
def receive_bundle(bundle: dict) -> dict:
|
||||||
record = store_bundle(DB_PATH, bundle)
|
record = store_bundle(INCIDENTS_DB_PATH, bundle)
|
||||||
return {"id": record.id, "entry_count": record.entry_count}
|
return {"id": record.id, "entry_count": record.entry_count}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/api/bundles")
|
@router.get("/api/bundles")
|
||||||
def list_bundles_endpoint() -> dict:
|
def list_bundles_endpoint() -> dict:
|
||||||
bundles = list_bundles(DB_PATH)
|
bundles = list_bundles(INCIDENTS_DB_PATH)
|
||||||
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
|
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/api/bundles/{bundle_id}")
|
@router.get("/api/bundles/{bundle_id}")
|
||||||
def get_bundle_endpoint(bundle_id: str) -> dict:
|
def get_bundle_endpoint(bundle_id: str) -> dict:
|
||||||
bundle = get_bundle(DB_PATH, bundle_id)
|
bundle = get_bundle(INCIDENTS_DB_PATH, bundle_id)
|
||||||
if not bundle:
|
if not bundle:
|
||||||
raise HTTPException(status_code=404, detail="Bundle not found")
|
raise HTTPException(status_code=404, detail="Bundle not found")
|
||||||
return dataclasses.asdict(bundle)
|
return dataclasses.asdict(bundle)
|
||||||
|
|
|
||||||
|
|
@ -196,6 +196,7 @@ async def diagnose_stream(
|
||||||
llm_model: str | None = None,
|
llm_model: str | None = None,
|
||||||
llm_api_key: str | None = None,
|
llm_api_key: str | None = None,
|
||||||
context_db_path: Path | None = None,
|
context_db_path: Path | None = None,
|
||||||
|
incidents_db_path: Path | None = None,
|
||||||
tech_level: str = "sysadmin",
|
tech_level: str = "sysadmin",
|
||||||
) -> AsyncGenerator[dict[str, Any], None]:
|
) -> AsyncGenerator[dict[str, Any], None]:
|
||||||
"""Async generator yielding SSE event dicts for the diagnose pipeline.
|
"""Async generator yielding SSE event dicts for the diagnose pipeline.
|
||||||
|
|
@ -318,6 +319,7 @@ async def diagnose_stream(
|
||||||
llm_model=llm_model,
|
llm_model=llm_model,
|
||||||
llm_api_key=llm_api_key,
|
llm_api_key=llm_api_key,
|
||||||
tech_level=tech_level,
|
tech_level=tech_level,
|
||||||
|
incidents_db_path=incidents_db_path,
|
||||||
):
|
):
|
||||||
yield event
|
yield event
|
||||||
return # pipeline emits its own "done" event
|
return # pipeline emits its own "done" event
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ async def run_pipeline(
|
||||||
llm_model: str | None,
|
llm_model: str | None,
|
||||||
llm_api_key: str | None,
|
llm_api_key: str | None,
|
||||||
tech_level: str = "sysadmin",
|
tech_level: str = "sysadmin",
|
||||||
|
incidents_db_path: Path | None = None,
|
||||||
) -> AsyncGenerator[dict[str, Any], None]:
|
) -> AsyncGenerator[dict[str, Any], None]:
|
||||||
"""Async generator that runs all 5 pipeline stages and yields SSE event dicts.
|
"""Async generator that runs all 5 pipeline stages and yields SSE event dicts.
|
||||||
|
|
||||||
|
|
@ -124,9 +125,10 @@ async def run_pipeline(
|
||||||
}
|
}
|
||||||
|
|
||||||
# Stage 4: False-positive suppression
|
# Stage 4: False-positive suppression
|
||||||
|
_incidents_db = incidents_db_path or db_path
|
||||||
try:
|
try:
|
||||||
ranked = await asyncio.to_thread(
|
ranked = await asyncio.to_thread(
|
||||||
FalsePositiveSuppressor().suppress, hypotheses, db_path
|
FalsePositiveSuppressor().suppress, hypotheses, _incidents_db
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.exception("Stage 4 (suppressor) failed: %s", exc)
|
logger.exception("Stage 4 (suppressor) failed: %s", exc)
|
||||||
|
|
|
||||||
|
|
@ -65,14 +65,14 @@ except ImportError: # pragma: no cover
|
||||||
# DB helpers
|
# DB helpers
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _fetch_resolved_incidents(db_path: Path) -> list[str]:
|
def _fetch_resolved_incidents(incidents_db_path: Path) -> list[str]:
|
||||||
"""Fetch resolved incident texts from SQLite.
|
"""Fetch resolved incident texts from the incidents database.
|
||||||
|
|
||||||
Returns a list of non-empty combined strings for each resolved incident.
|
Returns a list of non-empty combined strings for each resolved incident.
|
||||||
Returns an empty list on any error (missing table, connection failure, etc.).
|
Returns an empty list on any error (missing table, connection failure, etc.).
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
with sqlite3.connect(str(db_path), timeout=30.0) as conn:
|
with sqlite3.connect(str(incidents_db_path), timeout=30.0) as conn:
|
||||||
cursor = conn.execute(
|
cursor = conn.execute(
|
||||||
"SELECT label, notes FROM incidents WHERE ended_at IS NOT NULL LIMIT 200"
|
"SELECT label, notes FROM incidents WHERE ended_at IS NOT NULL LIMIT 200"
|
||||||
)
|
)
|
||||||
|
|
@ -125,13 +125,13 @@ class FalsePositiveSuppressor:
|
||||||
def suppress(
|
def suppress(
|
||||||
self,
|
self,
|
||||||
hypotheses: list[Hypothesis],
|
hypotheses: list[Hypothesis],
|
||||||
db_path: Path,
|
incidents_db_path: Path,
|
||||||
) -> list[RankedHypothesis]:
|
) -> list[RankedHypothesis]:
|
||||||
"""Rank hypotheses by novelty, suppressing those matching resolved incidents.
|
"""Rank hypotheses by novelty, suppressing those matching resolved incidents.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
hypotheses: Candidate hypotheses from Stage 3.
|
hypotheses: Candidate hypotheses from Stage 3.
|
||||||
db_path: Path to the Turnstone SQLite database containing incidents.
|
incidents_db_path: Path to the dedicated incidents SQLite database.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of RankedHypothesis sorted by (novelty_score * confidence) descending.
|
List of RankedHypothesis sorted by (novelty_score * confidence) descending.
|
||||||
|
|
@ -153,14 +153,14 @@ class FalsePositiveSuppressor:
|
||||||
)
|
)
|
||||||
return self._passthrough(hypotheses)
|
return self._passthrough(hypotheses)
|
||||||
|
|
||||||
# Fetch corpus texts from DB; fall back to passthrough if corpus is empty.
|
# Fetch corpus texts from incidents DB; fall back to passthrough if empty.
|
||||||
corpus_texts = _fetch_resolved_incidents(db_path)
|
corpus_texts = _fetch_resolved_incidents(incidents_db_path)
|
||||||
if not corpus_texts:
|
if not corpus_texts:
|
||||||
logger.debug("No resolved incidents found — all hypotheses treated as novel")
|
logger.debug("No resolved incidents found — all hypotheses treated as novel")
|
||||||
return self._passthrough(hypotheses)
|
return self._passthrough(hypotheses)
|
||||||
|
|
||||||
# Embed corpus (with caching).
|
# Embed corpus (with caching).
|
||||||
corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path)
|
corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, incidents_db_path)
|
||||||
|
|
||||||
# Score each hypothesis and sort by novelty * confidence descending.
|
# Score each hypothesis and sort by novelty * confidence descending.
|
||||||
ranked = [
|
ranked = [
|
||||||
|
|
@ -230,10 +230,10 @@ class FalsePositiveSuppressor:
|
||||||
self,
|
self,
|
||||||
embedder: Any,
|
embedder: Any,
|
||||||
corpus_texts: list[str],
|
corpus_texts: list[str],
|
||||||
db_path: Path,
|
incidents_db_path: Path,
|
||||||
) -> list[list[float]]:
|
) -> list[list[float]]:
|
||||||
"""Return cached corpus embeddings, re-embedding if the corpus has changed."""
|
"""Return cached corpus embeddings, re-embedding if the corpus has changed."""
|
||||||
cache_key = str(db_path)
|
cache_key = str(incidents_db_path)
|
||||||
cached = _corpus_cache.get(cache_key)
|
cached = _corpus_cache.get(cache_key)
|
||||||
|
|
||||||
if cached is not None:
|
if cached is not None:
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@ def client(tmp_path):
|
||||||
ensure_schema(db)
|
ensure_schema(db)
|
||||||
|
|
||||||
with patch.object(rest_module, "DB_PATH", db), \
|
with patch.object(rest_module, "DB_PATH", db), \
|
||||||
|
patch.object(rest_module, "CONTEXT_DB_PATH", tmp_path / "context.db"), \
|
||||||
|
patch.object(rest_module, "INCIDENTS_DB_PATH", tmp_path / "incidents.db"), \
|
||||||
patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \
|
patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \
|
||||||
patch.object(rest_module, "_compiled_patterns", []):
|
patch.object(rest_module, "_compiled_patterns", []):
|
||||||
with TestClient(rest_module.app, raise_server_exceptions=True) as c:
|
with TestClient(rest_module.app, raise_server_exceptions=True) as c:
|
||||||
|
|
@ -41,6 +43,8 @@ def client_with_candidate(tmp_path):
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
with patch.object(rest_module, "DB_PATH", db), \
|
with patch.object(rest_module, "DB_PATH", db), \
|
||||||
|
patch.object(rest_module, "CONTEXT_DB_PATH", tmp_path / "context.db"), \
|
||||||
|
patch.object(rest_module, "INCIDENTS_DB_PATH", tmp_path / "incidents.db"), \
|
||||||
patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \
|
patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \
|
||||||
patch.object(rest_module, "_compiled_patterns", []):
|
patch.object(rest_module, "_compiled_patterns", []):
|
||||||
with TestClient(rest_module.app, raise_server_exceptions=True) as c:
|
with TestClient(rest_module.app, raise_server_exceptions=True) as c:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue