turnstone/app/services/embeddings.py
pyr0ball 5f32a6678d refactor: extract embeddings service layer — decouple context embedder from Ollama
- New app/services/embeddings.py: TURNSTONE_EMBED_* env vars, multi-backend support
- embedder.py delegates to service layer; re-exports EMBEDDING_AVAILABLE for compat
- retriever.py updated to use service layer
- Test coverage updated in tests/context/test_embedder.py
2026-05-25 11:01:25 -07:00

229 lines
8.6 KiB
Python

"""Configurable embedding service — BSL licensed.
Backends:
sentence_transformers — local in-process inference (default, no server needed)
ollama — HTTP to a running Ollama instance
Configuration (env vars):
TURNSTONE_EMBED_BACKEND sentence_transformers | ollama (default: sentence_transformers)
TURNSTONE_EMBED_MODEL model name/path (backend-specific default)
TURNSTONE_EMBED_DEVICE cpu | cuda (default: cpu; ST backend only)
TURNSTONE_LLM_URL Ollama base URL (default: http://localhost:11434)
When no backend is importable/reachable, EMBEDDING_AVAILABLE is False and all
embed calls return empty arrays — callers must handle this gracefully.
"""
from __future__ import annotations
import logging
import os
import struct
from typing import Protocol, runtime_checkable
import numpy as np
logger = logging.getLogger(__name__)
# ── Public availability flag ──────────────────────────────────────────────────
EMBEDDING_AVAILABLE: bool = False
# ── Config ────────────────────────────────────────────────────────────────────
_BACKEND = os.environ.get("TURNSTONE_EMBED_BACKEND", "sentence_transformers").lower()
_DEVICE = os.environ.get("TURNSTONE_EMBED_DEVICE", "cpu").lower()
_LLM_URL = os.environ.get("TURNSTONE_LLM_URL", "http://localhost:11434")
# BAAI/bge-small-en-v1.5: 33MB, MIT, 49M downloads/month, 384-dim, 512-token max.
# Benchmarked as the best quality-to-size ratio in the field (MTEB 62.17).
# all-MiniLM-L6-v2 is a viable lighter alternative (23MB, 256-token max) if
# inference speed is the primary constraint.
_DEFAULT_MODEL: dict[str, str] = {
"sentence_transformers": "BAAI/bge-small-en-v1.5",
"ollama": "nomic-embed-text",
}
_MODEL = os.environ.get(
"TURNSTONE_EMBED_MODEL",
_DEFAULT_MODEL.get(_BACKEND, "sentence-transformers/all-MiniLM-L6-v2"),
)
# ── Protocol ──────────────────────────────────────────────────────────────────
@runtime_checkable
class Embedder(Protocol):
"""Minimal interface all embedding backends must satisfy."""
@property
def dim(self) -> int:
"""Embedding dimension produced by this model."""
...
@property
def model_name(self) -> str:
"""Human-readable model identifier."""
...
def embed(self, text: str) -> np.ndarray:
"""Embed a single string. Returns 1-D float32 array of length dim."""
...
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
"""Embed a list of strings. Returns list of 1-D float32 arrays."""
...
# ── sentence-transformers backend ─────────────────────────────────────────────
class SentenceTransformerEmbedder:
"""Local in-process embedding via the sentence-transformers library.
The model is downloaded from HuggingFace on first instantiation and cached
at ~/.cache/huggingface/. Subsequent starts use the local cache.
"""
def __init__(self, model_name: str = _MODEL, device: str = _DEVICE) -> None:
from sentence_transformers import SentenceTransformer # type: ignore[import]
logger.info("Loading embedding model %r on device %r ...", model_name, device)
self._model = SentenceTransformer(model_name, device=device)
self._model_name = model_name
# Infer dimension from a test embed rather than hard-coding
self._dim: int = int(self._model.encode("test").shape[0])
logger.info("Embedding model ready — dim=%d", self._dim)
@property
def dim(self) -> int:
return self._dim
@property
def model_name(self) -> str:
return self._model_name
def embed(self, text: str) -> np.ndarray:
vec = self._model.encode(text, convert_to_numpy=True, normalize_embeddings=True)
return vec.astype(np.float32)
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
if not texts:
return []
vecs = self._model.encode(
texts, convert_to_numpy=True, normalize_embeddings=True, batch_size=32
)
return [v.astype(np.float32) for v in vecs]
# ── Ollama backend ────────────────────────────────────────────────────────────
class OllamaEmbedder:
"""HTTP embedding via a running Ollama instance."""
def __init__(
self,
model_name: str = _MODEL,
llm_url: str = _LLM_URL,
timeout: float = 30.0,
) -> None:
import httpx # already a project dependency
self._model_name = model_name
self._url = f"{llm_url.rstrip('/')}/api/embeddings"
self._timeout = timeout
self._client = httpx.Client(timeout=timeout)
# Probe dimension with a test call
self._dim = self._probe_dim()
def _probe_dim(self) -> int:
try:
vec = self._raw_embed("probe")
return len(vec)
except Exception as exc:
logger.warning("Ollama dim probe failed (%s) — defaulting to 768", exc)
return 768
def _raw_embed(self, text: str) -> list[float]:
resp = self._client.post(
self._url, json={"model": self._model_name, "prompt": text}
)
resp.raise_for_status()
return resp.json().get("embedding") or []
@property
def dim(self) -> int:
return self._dim
@property
def model_name(self) -> str:
return self._model_name
def embed(self, text: str) -> np.ndarray:
vec = self._raw_embed(text)
return np.array(vec, dtype=np.float32)
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
return [self.embed(t) for t in texts]
# ── Singleton factory ─────────────────────────────────────────────────────────
_embedder: Embedder | None = None
def get_embedder() -> Embedder | None:
"""Return the configured embedder singleton, or None when unavailable.
Lazy-initialises on first call. Callers should check EMBEDDING_AVAILABLE
or test for None rather than calling this unconditionally.
"""
global _embedder, EMBEDDING_AVAILABLE
if _embedder is not None:
return _embedder
if _BACKEND == "sentence_transformers":
try:
_embedder = SentenceTransformerEmbedder(_MODEL, _DEVICE)
EMBEDDING_AVAILABLE = True
except ImportError:
logger.warning(
"sentence-transformers not installed — embeddings disabled. "
"Install with: pip install sentence-transformers"
)
except Exception as exc:
logger.warning("Failed to load sentence-transformers model %r: %s", _MODEL, exc)
elif _BACKEND == "ollama":
try:
_embedder = OllamaEmbedder(_MODEL, _LLM_URL)
EMBEDDING_AVAILABLE = True
except Exception as exc:
logger.warning("Ollama embedder init failed: %s", exc)
else:
logger.warning("Unknown TURNSTONE_EMBED_BACKEND %r — embeddings disabled", _BACKEND)
return _embedder
# ── BLOB serialisation helpers ────────────────────────────────────────────────
def pack_vector(vec: np.ndarray) -> bytes:
"""Serialise a float32 numpy vector to a SQLite BLOB."""
arr = vec.astype(np.float32)
return struct.pack(f"{len(arr)}f", *arr.tolist())
def unpack_vector(blob: bytes) -> np.ndarray:
"""Deserialise a SQLite BLOB back to a float32 numpy vector."""
n = len(blob) // 4 # 4 bytes per float32
return np.array(struct.unpack(f"{n}f", blob), dtype=np.float32)
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity between two L2-normalised vectors.
Both vectors are re-normalised defensively so callers need not pre-normalise.
Returns 0.0 when either vector has zero norm.
"""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))