"""Configurable embedding service — BSL licensed. Backends: sentence_transformers — local in-process inference (default, no server needed) ollama — HTTP to a running Ollama instance Configuration (env vars): TURNSTONE_EMBED_BACKEND sentence_transformers | ollama (default: sentence_transformers) TURNSTONE_EMBED_MODEL model name/path (backend-specific default) TURNSTONE_EMBED_DEVICE cpu | cuda (default: cpu; ST backend only) TURNSTONE_LLM_URL Ollama base URL (default: http://localhost:11434) When no backend is importable/reachable, EMBEDDING_AVAILABLE is False and all embed calls return empty arrays — callers must handle this gracefully. """ from __future__ import annotations import logging import os import struct from typing import Protocol, runtime_checkable import numpy as np logger = logging.getLogger(__name__) # ── Public availability flag ────────────────────────────────────────────────── EMBEDDING_AVAILABLE: bool = False # ── Config ──────────────────────────────────────────────────────────────────── _BACKEND = os.environ.get("TURNSTONE_EMBED_BACKEND", "sentence_transformers").lower() _DEVICE = os.environ.get("TURNSTONE_EMBED_DEVICE", "cpu").lower() _LLM_URL = os.environ.get("TURNSTONE_LLM_URL", "http://localhost:11434") # BAAI/bge-small-en-v1.5: 33MB, MIT, 49M downloads/month, 384-dim, 512-token max. # Benchmarked as the best quality-to-size ratio in the field (MTEB 62.17). # all-MiniLM-L6-v2 is a viable lighter alternative (23MB, 256-token max) if # inference speed is the primary constraint. _DEFAULT_MODEL: dict[str, str] = { "sentence_transformers": "BAAI/bge-small-en-v1.5", "ollama": "nomic-embed-text", } _MODEL = os.environ.get( "TURNSTONE_EMBED_MODEL", _DEFAULT_MODEL.get(_BACKEND, "sentence-transformers/all-MiniLM-L6-v2"), ) # ── Protocol ────────────────────────────────────────────────────────────────── @runtime_checkable class Embedder(Protocol): """Minimal interface all embedding backends must satisfy.""" @property def dim(self) -> int: """Embedding dimension produced by this model.""" ... @property def model_name(self) -> str: """Human-readable model identifier.""" ... def embed(self, text: str) -> np.ndarray: """Embed a single string. Returns 1-D float32 array of length dim.""" ... def embed_batch(self, texts: list[str]) -> list[np.ndarray]: """Embed a list of strings. Returns list of 1-D float32 arrays.""" ... # ── sentence-transformers backend ───────────────────────────────────────────── class SentenceTransformerEmbedder: """Local in-process embedding via the sentence-transformers library. The model is downloaded from HuggingFace on first instantiation and cached at ~/.cache/huggingface/. Subsequent starts use the local cache. """ def __init__(self, model_name: str = _MODEL, device: str = _DEVICE) -> None: from sentence_transformers import SentenceTransformer # type: ignore[import] logger.info("Loading embedding model %r on device %r ...", model_name, device) self._model = SentenceTransformer(model_name, device=device) self._model_name = model_name # Infer dimension from a test embed rather than hard-coding self._dim: int = int(self._model.encode("test").shape[0]) logger.info("Embedding model ready — dim=%d", self._dim) @property def dim(self) -> int: return self._dim @property def model_name(self) -> str: return self._model_name def embed(self, text: str) -> np.ndarray: vec = self._model.encode(text, convert_to_numpy=True, normalize_embeddings=True) return vec.astype(np.float32) def embed_batch(self, texts: list[str]) -> list[np.ndarray]: if not texts: return [] vecs = self._model.encode( texts, convert_to_numpy=True, normalize_embeddings=True, batch_size=32 ) return [v.astype(np.float32) for v in vecs] # ── Ollama backend ──────────────────────────────────────────────────────────── class OllamaEmbedder: """HTTP embedding via a running Ollama instance.""" def __init__( self, model_name: str = _MODEL, llm_url: str = _LLM_URL, timeout: float = 30.0, ) -> None: import httpx # already a project dependency self._model_name = model_name self._url = f"{llm_url.rstrip('/')}/api/embeddings" self._timeout = timeout self._client = httpx.Client(timeout=timeout) # Probe dimension with a test call self._dim = self._probe_dim() def _probe_dim(self) -> int: try: vec = self._raw_embed("probe") return len(vec) except Exception as exc: logger.warning("Ollama dim probe failed (%s) — defaulting to 768", exc) return 768 def _raw_embed(self, text: str) -> list[float]: resp = self._client.post( self._url, json={"model": self._model_name, "prompt": text} ) resp.raise_for_status() return resp.json().get("embedding") or [] @property def dim(self) -> int: return self._dim @property def model_name(self) -> str: return self._model_name def embed(self, text: str) -> np.ndarray: vec = self._raw_embed(text) return np.array(vec, dtype=np.float32) def embed_batch(self, texts: list[str]) -> list[np.ndarray]: return [self.embed(t) for t in texts] # ── Singleton factory ───────────────────────────────────────────────────────── _embedder: Embedder | None = None def get_embedder() -> Embedder | None: """Return the configured embedder singleton, or None when unavailable. Lazy-initialises on first call. Callers should check EMBEDDING_AVAILABLE or test for None rather than calling this unconditionally. """ global _embedder, EMBEDDING_AVAILABLE if _embedder is not None: return _embedder if _BACKEND == "sentence_transformers": try: _embedder = SentenceTransformerEmbedder(_MODEL, _DEVICE) EMBEDDING_AVAILABLE = True except ImportError: logger.warning( "sentence-transformers not installed — embeddings disabled. " "Install with: pip install sentence-transformers" ) except Exception as exc: logger.warning("Failed to load sentence-transformers model %r: %s", _MODEL, exc) elif _BACKEND == "ollama": try: _embedder = OllamaEmbedder(_MODEL, _LLM_URL) EMBEDDING_AVAILABLE = True except Exception as exc: logger.warning("Ollama embedder init failed: %s", exc) else: logger.warning("Unknown TURNSTONE_EMBED_BACKEND %r — embeddings disabled", _BACKEND) return _embedder # ── BLOB serialisation helpers ──────────────────────────────────────────────── def pack_vector(vec: np.ndarray) -> bytes: """Serialise a float32 numpy vector to a SQLite BLOB.""" arr = vec.astype(np.float32) return struct.pack(f"{len(arr)}f", *arr.tolist()) def unpack_vector(blob: bytes) -> np.ndarray: """Deserialise a SQLite BLOB back to a float32 numpy vector.""" n = len(blob) // 4 # 4 bytes per float32 return np.array(struct.unpack(f"{n}f", blob), dtype=np.float32) def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: """Cosine similarity between two L2-normalised vectors. Both vectors are re-normalised defensively so callers need not pre-normalise. Returns 0.0 when either vector has zero norm. """ norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0.0 or norm_b == 0.0: return 0.0 return float(np.dot(a, b) / (norm_a * norm_b))