feat: wizard state machine — structured Q&A writes context facts and source config

This commit is contained in:
pyr0ball 2026-05-13 16:25:52 -07:00
parent de662725ee
commit abb61a6e90
2 changed files with 212 additions and 0 deletions

125
app/context/wizard.py Normal file
View file

@ -0,0 +1,125 @@
"""Wizard state machine — MIT (structured Q&A); BSL gate reserved for LLM path."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from app.context.store import add_fact
WIZARD_STEPS: list[dict[str, Any]] = [
{
"step": 1,
"id": "os",
"title": "What operating system is this server running?",
"type": "select",
"options": ["Linux (systemd/journald)", "Linux (other init)", "Other"],
"optional": False,
"help": "This determines which log sources Turnstone can watch.",
},
{
"step": 2,
"id": "hostname",
"title": "What is this server's hostname?",
"type": "text",
"placeholder": "e.g. heimdall.local",
"optional": False,
"help": "Used to label log sources in diagnosis results.",
},
{
"step": 3,
"id": "services",
"title": "Which named systemd services do you want to monitor?",
"type": "text",
"placeholder": "e.g. plex.service, sonarr.service",
"optional": True,
"help": "Comma-separated. Leave blank to collect all journal output.",
},
{
"step": 4,
"id": "docker",
"title": "Are you running Docker or Podman containers?",
"type": "select",
"options": ["Yes — Docker", "Yes — Podman", "No"],
"optional": False,
"help": "Turnstone can tail container log streams directly.",
},
{
"step": 5,
"id": "syslog",
"title": "Do any network devices send syslog UDP to this server?",
"type": "select",
"options": ["Yes — UDP 514", "Yes — custom port", "No"],
"optional": False,
"help": "Routers, switches, and APs can forward logs via UDP syslog.",
},
{
"step": 6,
"id": "syslog_port",
"title": "What UDP port does your syslog device send to?",
"type": "text",
"placeholder": "e.g. 514",
"optional": True,
"condition": {"step_id": "syslog", "value": "Yes — custom port"},
"help": "Only needed if you chose 'custom port' above.",
},
]
TOTAL_STEPS: int = len(WIZARD_STEPS)
def get_schema() -> list[dict[str, Any]]:
"""Return the wizard schema (all steps)."""
return WIZARD_STEPS
def advance_step(session: dict[str, Any], step_id: str, answer: Any) -> dict[str, Any]:
"""Return a new session dict with the answer recorded and current_step incremented."""
answers = {**session.get("answers", {}), step_id: answer}
return {**session, "answers": answers, "current_step": session.get("current_step", 1) + 1}
def is_complete(session: dict[str, Any]) -> bool:
"""Check if wizard session has progressed past all steps."""
return session.get("current_step", 1) > TOTAL_STEPS
def apply_session(db_path: Path, session: dict[str, Any]) -> dict[str, Any]:
"""Write context facts and return source config from a completed wizard session."""
answers: dict[str, Any] = session.get("answers", {})
facts_written = 0
hostname = str(answers.get("hostname") or "this-host")
if answers.get("hostname"):
add_fact(db_path, "host", "hostname", hostname, source="wizard")
facts_written += 1
if answers.get("os"):
add_fact(db_path, "host", "os", str(answers["os"]), source="wizard")
facts_written += 1
sources: list[dict[str, Any]] = []
services_raw = str(answers.get("services") or "")
services = [s.strip() for s in services_raw.split(",") if s.strip()]
if services:
for svc in services:
sources.append({"type": "journald", "id": f"journal:{hostname}:{svc}", "unit": svc})
else:
sources.append({"type": "journald", "id": f"journal:{hostname}"})
docker_answer = str(answers.get("docker") or "No")
if "Docker" in docker_answer:
sources.append({"type": "docker", "id": f"docker:{hostname}"})
elif "Podman" in docker_answer:
sources.append({"type": "docker", "id": f"podman:{hostname}", "runtime": "podman"})
syslog_answer = str(answers.get("syslog") or "No")
if syslog_answer.startswith("Yes"):
port = int(answers.get("syslog_port") or 514)
sources.append({"type": "syslog", "id": f"syslog:{hostname}", "port": port})
return {
"facts_written": facts_written,
"sources": sources,
"source_count": len(sources),
}

View file

@ -0,0 +1,87 @@
"""Tests for app/context/wizard.py state machine."""
import sqlite3
import pytest
from pathlib import Path
from app.context.wizard import get_schema, advance_step, is_complete, apply_session, TOTAL_STEPS
@pytest.fixture
def db(tmp_path):
db_path = tmp_path / "t.db"
conn = sqlite3.connect(str(db_path))
conn.executescript("""
CREATE TABLE context_facts (
id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL,
value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL
);
""")
conn.commit()
conn.close()
return db_path
def test_get_schema_returns_steps():
schema = get_schema()
assert len(schema) == TOTAL_STEPS
assert all("step" in s and "id" in s and "title" in s for s in schema)
def test_advance_step_records_answer():
session: dict = {"current_step": 1, "answers": {}}
updated = advance_step(session, "os", "Linux (systemd/journald)")
assert updated["answers"]["os"] == "Linux (systemd/journald)"
assert updated["current_step"] == 2
def test_advance_step_is_immutable():
session: dict = {"current_step": 1, "answers": {}}
updated = advance_step(session, "os", "Linux (systemd/journald)")
assert session["current_step"] == 1 # original unchanged
def test_is_complete_false_mid_wizard():
session = {"current_step": 3, "answers": {}}
assert is_complete(session) is False
def test_is_complete_true_after_all_steps():
session = {"current_step": TOTAL_STEPS + 1, "answers": {}}
assert is_complete(session) is True
def test_apply_session_writes_hostname_fact(db):
session = {
"current_step": TOTAL_STEPS + 1,
"answers": {
"os": "Linux (systemd/journald)",
"hostname": "heimdall.local",
"services": "plex.service, sonarr.service",
"docker": "Yes — Docker",
"syslog": "No",
},
}
result = apply_session(db, session)
assert result["facts_written"] >= 2
assert result["source_count"] >= 3 # journald×2 + docker
conn = sqlite3.connect(str(db))
facts = conn.execute("SELECT key, value FROM context_facts").fetchall()
conn.close()
keys = [f[0] for f in facts]
assert "hostname" in keys
def test_apply_session_no_services(db):
session = {
"current_step": TOTAL_STEPS + 1,
"answers": {
"hostname": "strahl",
"os": "Linux (systemd/journald)",
"services": "",
"docker": "No",
"syslog": "No",
},
}
result = apply_session(db, session)
# At least one journald source (catch-all), no docker, no syslog
assert result["source_count"] == 1