- Add app/db/ abstraction layer: Backend enum, DbConn wrapper, dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id() - Auto-detect backend from DATABASE_URL; SQLite remains default when unset — no config change for local deployments - Add tenant_id column to all three logical DBs (main, context, incidents); idempotent ALTER TABLE migration runs before schema scripts on existing DBs - All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '') for backward compat with pre-namespacing rows - Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds) and optional external Postgres support via DATABASE_URL override - Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs - Fix SSH glean path in pipeline.py to use ensure_schema + get_conn (was still using raw sqlite3.connect + old _SCHEMA without tenant_id) - Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search - Update all tests to use ensure_*_schema fixtures; add row_factory where needed - 394/394 tests passing Closes: #42 Closes: #50
80 lines
2.4 KiB
Python
80 lines
2.4 KiB
Python
"""Tests for app/context/wizard.py state machine."""
|
||
import sqlite3
|
||
import pytest
|
||
from pathlib import Path
|
||
from app.db.schema import ensure_context_schema
|
||
from app.context.wizard import get_schema, advance_step, is_complete, apply_session, TOTAL_STEPS
|
||
|
||
|
||
@pytest.fixture
|
||
def db(tmp_path):
|
||
db_path = tmp_path / "t.db"
|
||
ensure_context_schema(db_path)
|
||
return db_path
|
||
|
||
|
||
def test_get_schema_returns_steps():
|
||
schema = get_schema()
|
||
assert len(schema) == TOTAL_STEPS
|
||
assert all("step" in s and "id" in s and "title" in s for s in schema)
|
||
|
||
|
||
def test_advance_step_records_answer():
|
||
session: dict = {"current_step": 1, "answers": {}}
|
||
updated = advance_step(session, "os", "Linux (systemd/journald)")
|
||
assert updated["answers"]["os"] == "Linux (systemd/journald)"
|
||
assert updated["current_step"] == 2
|
||
|
||
|
||
def test_advance_step_is_immutable():
|
||
session: dict = {"current_step": 1, "answers": {}}
|
||
updated = advance_step(session, "os", "Linux (systemd/journald)")
|
||
assert session["current_step"] == 1 # original unchanged
|
||
|
||
|
||
def test_is_complete_false_mid_wizard():
|
||
session = {"current_step": 3, "answers": {}}
|
||
assert is_complete(session) is False
|
||
|
||
|
||
def test_is_complete_true_after_all_steps():
|
||
session = {"current_step": TOTAL_STEPS + 1, "answers": {}}
|
||
assert is_complete(session) is True
|
||
|
||
|
||
def test_apply_session_writes_hostname_fact(db):
|
||
session = {
|
||
"current_step": TOTAL_STEPS + 1,
|
||
"answers": {
|
||
"os": "Linux (systemd/journald)",
|
||
"hostname": "heimdall.local",
|
||
"services": "plex.service, sonarr.service",
|
||
"docker": "Yes — Docker",
|
||
"syslog": "No",
|
||
},
|
||
}
|
||
result = apply_session(db, session)
|
||
assert result["facts_written"] >= 2
|
||
assert result["source_count"] >= 3 # journald×2 + docker
|
||
|
||
conn = sqlite3.connect(str(db))
|
||
facts = conn.execute("SELECT key, value FROM context_facts").fetchall()
|
||
conn.close()
|
||
keys = [f[0] for f in facts]
|
||
assert "hostname" in keys
|
||
|
||
|
||
def test_apply_session_no_services(db):
|
||
session = {
|
||
"current_step": TOTAL_STEPS + 1,
|
||
"answers": {
|
||
"hostname": "strahl",
|
||
"os": "Linux (systemd/journald)",
|
||
"services": "",
|
||
"docker": "No",
|
||
"syslog": "No",
|
||
},
|
||
}
|
||
result = apply_session(db, session)
|
||
# At least one journald source (catch-all), no docker, no syslog
|
||
assert result["source_count"] == 1
|