turnstone/tests/context/test_store.py
pyr0ball 0311d72e53 feat: dual-backend SQLite/Postgres + multi-tenant source namespacing
- Add app/db/ abstraction layer: Backend enum, DbConn wrapper,
  dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id()
- Auto-detect backend from DATABASE_URL; SQLite remains default when
  unset — no config change for local deployments
- Add tenant_id column to all three logical DBs (main, context, incidents);
  idempotent ALTER TABLE migration runs before schema scripts on existing DBs
- All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '')
  for backward compat with pre-namespacing rows
- Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds)
  and optional external Postgres support via DATABASE_URL override
- Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration
  for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs
- Fix SSH glean path in pipeline.py to use ensure_schema + get_conn
  (was still using raw sqlite3.connect + old _SCHEMA without tenant_id)
- Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search
- Update all tests to use ensure_*_schema fixtures; add row_factory where needed
- 394/394 tests passing

Closes: #42
Closes: #50
2026-06-08 08:37:54 -07:00

89 lines
2.6 KiB
Python

"""Tests for app/context/store.py — fact and document CRUD."""
import sqlite3
import pytest
from pathlib import Path
from app.db.schema import ensure_context_schema
from app.context.store import (
add_fact, list_facts, delete_fact,
add_document, list_documents, delete_document,
ContextFact, ContextDocument,
)
@pytest.fixture
def db(tmp_path):
db_path = tmp_path / "t.db"
ensure_context_schema(db_path)
return db_path
def test_add_fact_returns_fact(db):
fact = add_fact(db, "host", "hostname", "heimdall.local", source="wizard")
assert isinstance(fact, ContextFact)
assert fact.id
assert fact.category == "host"
assert fact.key == "hostname"
assert fact.value == "heimdall.local"
assert fact.source == "wizard"
assert fact.created_at
def test_list_facts_empty(db):
assert list_facts(db) == []
def test_list_facts_all(db):
add_fact(db, "host", "hostname", "heimdall.local")
add_fact(db, "service", "plex", "port:32400")
facts = list_facts(db)
assert len(facts) == 2
def test_list_facts_by_category(db):
add_fact(db, "host", "hostname", "heimdall.local")
add_fact(db, "service", "plex", "port:32400")
add_fact(db, "service", "sonarr", "port:8989")
assert len(list_facts(db, category="host")) == 1
assert len(list_facts(db, category="service")) == 2
def test_delete_fact_returns_true(db):
fact = add_fact(db, "note", "k", "v")
assert delete_fact(db, fact.id) is True
assert list_facts(db) == []
def test_delete_fact_missing_returns_false(db):
assert delete_fact(db, "nonexistent") is False
def test_add_document_returns_document(db):
doc = add_document(db, "runbook.md", "markdown", "# Plex\nRestart with systemctl", file_size=100)
assert isinstance(doc, ContextDocument)
assert doc.id
assert doc.filename == "runbook.md"
assert doc.doc_type == "markdown"
def test_list_documents_empty(db):
assert list_documents(db) == []
def test_delete_document_cascades_chunks(db):
doc = add_document(db, "test.md", "markdown", "content")
conn = sqlite3.connect(str(db))
conn.execute(
"INSERT INTO context_chunks(id, document_id, chunk_index, text) VALUES (?,?,0,?)",
("c1", doc.id, "chunk text"),
)
conn.commit()
conn.close()
assert delete_document(db, doc.id) is True
conn = sqlite3.connect(str(db))
chunks = conn.execute("SELECT * FROM context_chunks WHERE document_id=?", (doc.id,)).fetchall()
conn.close()
assert chunks == []
def test_delete_document_missing_returns_false(db):
assert delete_document(db, "nonexistent") is False