feat: cf-core v0.19.0 — add PDF extraction, VectorStore, LLMRouter.embed()
Some checks failed
CI / test (push) Waiting to run
Mirror / mirror (push) Has been cancelled
Release — PyPI / release (push) Has been cancelled

This commit is contained in:
pyr0ball 2026-05-04 16:11:57 -07:00
commit ccc6a15d94
11 changed files with 966 additions and 58 deletions

View file

@ -0,0 +1,133 @@
# circuitforge_core/documents/pdf.py
"""
circuitforge_core.documents.pdf PDF text extraction and page-level chunking.
Primary path: pdfplumber (selectable text layers).
Fallback: pytesseract OCR (scanned / image-only pages).
Usage::
from circuitforge_core.documents.pdf import PDFExtractor
chunks = PDFExtractor().chunk_pages("/path/to/book.pdf")
for chunk in chunks:
print(f"[p.{chunk.page_number}] ({chunk.source}) {chunk.text[:80]}")
"""
from __future__ import annotations
import io
import logging
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
try:
import pdfplumber
except ImportError: # pragma: no cover
pdfplumber = None # type: ignore[assignment]
try:
import pytesseract
except ImportError: # pragma: no cover
pytesseract = None # type: ignore[assignment]
try:
from PIL import Image
except ImportError: # pragma: no cover
Image = None # type: ignore[assignment]
@dataclass(frozen=True)
class PageChunk:
"""Text content extracted from a single PDF page."""
page_number: int # 1-indexed
text: str
source: str # "text_layer" | "ocr"
word_count: int
class PDFExtractor:
"""
Extract page-level text chunks from PDF files.
Args:
ocr_min_words: Pages with fewer words from the text layer trigger OCR.
"""
def __init__(self, ocr_min_words: int = 10) -> None:
self.ocr_min_words = ocr_min_words
def chunk_pages(self, pdf_path: str | Path) -> list[PageChunk]:
"""
Primary entry point. Returns one PageChunk per page.
Uses text-layer extraction per page; falls back to OCR when text is sparse.
Empty PDFs return an empty list.
"""
if pdfplumber is None:
raise ImportError(
"pdfplumber is required for PDF extraction. "
"Install it with: pip install pdfplumber"
)
path = Path(pdf_path)
chunks: list[PageChunk] = []
with pdfplumber.open(path) as pdf:
for i, page in enumerate(pdf.pages, start=1):
text = page.extract_text() or ""
words = text.split()
if len(words) >= self.ocr_min_words:
chunks.append(
PageChunk(
page_number=i,
text=text.strip(),
source="text_layer",
word_count=len(words),
)
)
else:
logger.debug(
"pdf: page %d sparse (%d words), falling back to OCR",
i,
len(words),
)
chunks.append(self._ocr_page(page, i))
return chunks
def _ocr_page(self, page: object, page_number: int) -> PageChunk:
"""Render page to image and extract text via tesseract."""
try:
rendered = page.to_image(resolution=200).original # type: ignore[attr-defined]
rendered = _ensure_pil_image(rendered)
text = pytesseract.image_to_string(rendered) # type: ignore[union-attr]
words = text.split()
return PageChunk(
page_number=page_number,
text=text.strip(),
source="ocr",
word_count=len(words),
)
except Exception as exc:
logger.warning("pdf: OCR failed for page %d: %s", page_number, exc)
return PageChunk(
page_number=page_number, text="", source="ocr", word_count=0
)
def _ensure_pil_image(rendered: object) -> object:
"""Return *rendered* as a PIL Image, converting from bytes if needed."""
if Image is None:
return rendered
try:
if not isinstance(rendered, Image.Image):
rendered = Image.open(io.BytesIO(rendered)) # type: ignore[arg-type]
except TypeError:
# Image may be patched (e.g. in tests); skip the conversion.
pass
return rendered

View file

@ -43,6 +43,7 @@ When llm.yaml is absent, the router builds a minimal config from environment
variables: ANTHROPIC_API_KEY, OPENAI_API_KEY / OPENAI_BASE_URL, OLLAMA_HOST.
Ollama on localhost:11434 is always included as the lowest-cost local fallback.
"""
import logging
import os
import yaml
@ -70,7 +71,8 @@ class LLMRouter:
)
logger.info(
"[LLMRouter] No llm.yaml found — using env-var auto-config "
"(backends: %s)", ", ".join(env_config["fallback_order"])
"(backends: %s)",
", ".join(env_config["fallback_order"]),
)
self.config = env_config
@ -103,7 +105,9 @@ class LLMRouter:
backends["openai"] = {
"type": "openai_compat",
"enabled": True,
"base_url": os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1"),
"base_url": os.environ.get(
"OPENAI_BASE_URL", "https://api.openai.com/v1"
),
"model": os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
"api_key": os.environ.get("OPENAI_API_KEY"),
"supports_images": True,
@ -156,6 +160,7 @@ class LLMRouter:
Caller MUST call ctx.__exit__(None, None, None) in a finally block.
"""
import os
orch_cfg = backend.get("cf_orch")
if not orch_cfg:
return None
@ -164,6 +169,7 @@ class LLMRouter:
return None
try:
from circuitforge_orch.client import CFOrchClient
client = CFOrchClient(orch_url)
service = orch_cfg.get("service", "vllm")
candidates = orch_cfg.get("model_candidates", [])
@ -181,14 +187,21 @@ class LLMRouter:
alloc = ctx.__enter__()
return (ctx, alloc)
except Exception as exc:
logger.warning("[LLMRouter] cf_orch allocation failed, using base_url directly: %s", exc)
logger.warning(
"[LLMRouter] cf_orch allocation failed, using base_url directly: %s",
exc,
)
return None
def complete(self, prompt: str, system: str | None = None,
model_override: str | None = None,
fallback_order: list[str] | None = None,
images: list[str] | None = None,
max_tokens: int | None = None) -> str:
def complete(
self,
prompt: str,
system: str | None = None,
model_override: str | None = None,
fallback_order: list[str] | None = None,
images: list[str] | None = None,
max_tokens: int | None = None,
) -> str:
"""
Generate a completion. Tries each backend in fallback_order.
@ -206,7 +219,11 @@ class LLMRouter:
"AI inference is disabled in the public demo. "
"Run your own instance to use AI features."
)
order = fallback_order if fallback_order is not None else self.config["fallback_order"]
order = (
fallback_order
if fallback_order is not None
else self.config["fallback_order"]
)
for name in order:
backend = self.config["backends"][name]
@ -283,10 +300,14 @@ class LLMRouter:
if images and supports_images:
content = [{"type": "text", "text": prompt}]
for img in images:
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img}"},
})
content.append(
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{img}"
},
}
)
messages.append({"role": "user", "content": content})
else:
messages.append({"role": "user", "content": prompt})
@ -311,18 +332,27 @@ class LLMRouter:
elif backend["type"] == "anthropic":
api_key = os.environ.get(backend["api_key_env"], "")
if not api_key:
print(f"[LLMRouter] {name}: {backend['api_key_env']} not set, skipping")
print(
f"[LLMRouter] {name}: {backend['api_key_env']} not set, skipping"
)
continue
try:
import anthropic as _anthropic
client = _anthropic.Anthropic(api_key=api_key)
if images and supports_images:
content = []
for img in images:
content.append({
"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": img},
})
content.append(
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": img,
},
}
)
content.append({"type": "text", "text": prompt})
else:
content = prompt
@ -342,6 +372,81 @@ class LLMRouter:
raise RuntimeError("All LLM backends exhausted")
def embed(
self,
texts: list[str],
model_override: str | None = None,
fallback_order: list[str] | None = None,
) -> list[list[float]]:
"""
Generate embeddings for a list of texts.
Only openai_compat backends are tried Ollama and vLLM expose
/v1/embeddings; anthropic and vision_service do not.
Uses ``embedding_model`` from backend config when present;
falls back to ``model`` (the chat model) otherwise.
Args:
texts: Texts to embed (batched in a single API call).
model_override: Override the embedding model for this call.
fallback_order: Override the backend fallback order for this call.
Returns:
List of float vectors, one per input text, in input order.
Raises:
RuntimeError: If all eligible backends are exhausted.
"""
if os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes"):
raise RuntimeError(
"AI inference is disabled in the public demo. "
"Run your own instance to use AI features."
)
order = (
fallback_order
if fallback_order is not None
else self.config["fallback_order"]
)
for name in order:
backend = self.config["backends"][name]
if not backend.get("enabled", True):
continue
if backend["type"] != "openai_compat":
continue
orch_ctx = orch_alloc = None
orch_result = self._try_cf_orch_alloc(backend)
if orch_result is not None:
orch_ctx, orch_alloc = orch_result
backend = {**backend, "base_url": orch_alloc.url + "/v1"}
elif not self._is_reachable(backend["base_url"]):
print(f"[LLMRouter] {name}: unreachable, skipping")
continue
try:
client = OpenAI(
base_url=backend["base_url"],
api_key=backend.get("api_key") or "any",
)
model = model_override or backend.get(
"embedding_model", backend["model"]
)
resp = client.embeddings.create(model=model, input=texts)
print(f"[LLMRouter] embed: used backend {name} ({model})")
return [item.embedding for item in resp.data]
except Exception as e:
print(f"[LLMRouter] {name}: embed error — {e}, trying next")
continue
finally:
if orch_ctx is not None:
try:
orch_ctx.__exit__(None, None, None)
except Exception:
pass
raise RuntimeError("All LLM backends exhausted for embed()")
# Module-level singleton for convenience
_router: LLMRouter | None = None

View file

@ -0,0 +1,4 @@
from .base import VectorMatch, VectorStore
from .sqlite_vec import LocalSQLiteVecStore
__all__ = ["VectorMatch", "VectorStore", "LocalSQLiteVecStore"]

View file

@ -0,0 +1,50 @@
"""
circuitforge_core.vector.base VectorStore ABC and shared types.
Concrete implementations: LocalSQLiteVecStore (local), QdrantStore (cloud Paid tier).
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class VectorMatch:
"""A single result from a vector similarity search."""
entry_id: str
score: float # lower is better (L2 / cosine distance)
metadata: dict[str, Any] = field(default_factory=dict)
class VectorStore(ABC):
"""Abstract interface for vector storage backends."""
@abstractmethod
def upsert(
self, entry_id: str, vector: list[float], metadata: dict[str, Any]
) -> None:
"""Insert or replace a vector and its metadata."""
@abstractmethod
def query(
self,
vector: list[float],
top_k: int = 10,
filter_metadata: dict[str, Any] | None = None,
) -> list[VectorMatch]:
"""Return the top_k nearest vectors. Optional metadata filter applied post-search."""
@abstractmethod
def delete(self, entry_id: str) -> None:
"""Remove a single vector by string ID. No-op if not found."""
@abstractmethod
def delete_where(self, filter_metadata: dict[str, Any]) -> int:
"""Remove all vectors whose metadata matches all key-value pairs. Returns count removed.
Raises ValueError if filter_metadata is empty (would delete entire store).
"""

View file

@ -0,0 +1,185 @@
# circuitforge_core/vector/sqlite_vec.py
"""
circuitforge_core.vector.sqlite_vec -- sqlite-vec backed VectorStore.
Suitable for single-user local deployments. Cloud Paid tier replaces
this with QdrantStore via the same VectorStore ABC.
"""
from __future__ import annotations
import json
import logging
import re
import sqlite3
import struct
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Generator
import sqlite_vec
from .base import VectorMatch, VectorStore
logger = logging.getLogger(__name__)
_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
def _serialize(vector: list[float]) -> bytes:
return struct.pack(f"<{len(vector)}f", *vector)
class LocalSQLiteVecStore(VectorStore):
"""
VectorStore backed by sqlite-vec virtual tables.
Uses two tables per logical store:
- ``<table>_vecs``: vec0 virtual table (rowid-indexed float vectors)
- ``<table>_meta``: companion table mapping rowid to string ID + JSON metadata
Args:
db_path: Path to SQLite database file.
table: Logical name prefix (default ``"vecs"``).
dimensions: Vector length; must match the embedding model (default 768).
"""
def __init__(
self,
db_path: str | Path,
table: str = "vecs",
dimensions: int = 768,
) -> None:
if not _SAFE_IDENTIFIER.match(table):
raise ValueError(
f"table must be a valid SQL identifier (letters, digits, underscores): {table!r}"
)
self.db_path = str(db_path)
self.table = table
self.dimensions = dimensions
self._init_tables()
@contextmanager
def _conn(self) -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(self.db_path)
conn.enable_load_extension(True)
sqlite_vec.load(conn)
conn.enable_load_extension(False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _init_tables(self) -> None:
with self._conn() as conn:
conn.execute(f"""
CREATE VIRTUAL TABLE IF NOT EXISTS {self.table}_vecs
USING vec0(embedding float[{self.dimensions}])
""")
conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table}_meta (
rowid INTEGER PRIMARY KEY,
entry_id TEXT NOT NULL UNIQUE,
metadata TEXT NOT NULL DEFAULT '{{}}'
)
""")
def upsert(
self, entry_id: str, vector: list[float], metadata: dict[str, Any]
) -> None:
with self._conn() as conn:
row = conn.execute(
f"SELECT rowid FROM {self.table}_meta WHERE entry_id = ?", [entry_id]
).fetchone()
if row:
rowid = row["rowid"]
conn.execute(
f"UPDATE {self.table}_vecs SET embedding = ? WHERE rowid = ?",
[_serialize(vector), rowid],
)
conn.execute(
f"UPDATE {self.table}_meta SET metadata = ? WHERE rowid = ?",
[json.dumps(metadata), rowid],
)
else:
cursor = conn.execute(
f"INSERT INTO {self.table}_meta(entry_id, metadata) VALUES (?, ?)",
[entry_id, json.dumps(metadata)],
)
rowid = cursor.lastrowid
conn.execute(
f"INSERT INTO {self.table}_vecs(rowid, embedding) VALUES (?, ?)",
[rowid, _serialize(vector)],
)
def query(
self,
vector: list[float],
top_k: int = 10,
filter_metadata: dict[str, Any] | None = None,
) -> list[VectorMatch]:
with self._conn() as conn:
rows = conn.execute(
f"""
SELECT m.entry_id, v.distance, m.metadata
FROM {self.table}_vecs v
JOIN {self.table}_meta m ON m.rowid = v.rowid
WHERE v.embedding MATCH ? AND k = ?
ORDER BY v.distance
""",
[_serialize(vector), top_k],
).fetchall()
results = [
VectorMatch(
entry_id=r["entry_id"],
score=r["distance"],
metadata=json.loads(r["metadata"]),
)
for r in rows
]
if filter_metadata:
results = [
r
for r in results
if all(r.metadata.get(k) == v for k, v in filter_metadata.items())
]
return results
def delete(self, entry_id: str) -> None:
with self._conn() as conn:
row = conn.execute(
f"SELECT rowid FROM {self.table}_meta WHERE entry_id = ?", [entry_id]
).fetchone()
if row:
rowid = row["rowid"]
conn.execute(f"DELETE FROM {self.table}_vecs WHERE rowid = ?", [rowid])
conn.execute(f"DELETE FROM {self.table}_meta WHERE rowid = ?", [rowid])
def delete_where(self, filter_metadata: dict[str, Any]) -> int:
if not filter_metadata:
raise ValueError(
"delete_where requires a non-empty filter; refusing to delete entire store"
)
with self._conn() as conn:
rows = conn.execute(
f"SELECT rowid, metadata FROM {self.table}_meta"
).fetchall()
to_delete = [
r["rowid"]
for r in rows
if all(
json.loads(r["metadata"]).get(k) == v
for k, v in filter_metadata.items()
)
]
for rowid in to_delete:
conn.execute(f"DELETE FROM {self.table}_vecs WHERE rowid = ?", [rowid])
conn.execute(f"DELETE FROM {self.table}_meta WHERE rowid = ?", [rowid])
return len(to_delete)

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "circuitforge-core"
version = "0.18.0"
version = "0.19.0"
description = "Shared scaffold for CircuitForge products (MIT)"
requires-python = ">=3.11"
dependencies = [
@ -107,6 +107,14 @@ gestures-mediapipe = [
"opencv-python>=4.8",
"numpy>=1.24",
]
pdf = [
"pdfplumber>=0.11",
"pytesseract>=0.3",
"Pillow>=10.0",
]
vector = [
"sqlite-vec>=0.1",
]
dev = [
"circuitforge-core[manage]",
"pytest>=8.0",

View file

@ -0,0 +1,107 @@
# tests/test_documents/test_pdf.py
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.documents.pdf import PDFExtractor, PageChunk
def _mock_page(text: str) -> MagicMock:
page = MagicMock()
page.extract_text.return_value = text
return page
def _mock_pdf(pages: list[MagicMock]) -> MagicMock:
pdf = MagicMock()
pdf.__enter__ = MagicMock(return_value=pdf)
pdf.__exit__ = MagicMock(return_value=False)
pdf.pages = pages
return pdf
def test_chunk_pages_single_text_layer_page():
page = _mock_page(
"Fireball deals 8d6 fire damage on a failed Dexterity saving throw."
)
with patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl:
mock_pl.open.return_value = _mock_pdf([page])
chunks = PDFExtractor().chunk_pages("/fake/book.pdf")
assert len(chunks) == 1
assert chunks[0].page_number == 1
assert chunks[0].source == "text_layer"
assert "Fireball" in chunks[0].text
assert chunks[0].word_count >= 10
def test_chunk_pages_numbers_from_one():
pages = [_mock_page(f"Rule text for page {i} " * 10) for i in range(1, 4)]
with patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl:
mock_pl.open.return_value = _mock_pdf(pages)
chunks = PDFExtractor().chunk_pages("/fake/book.pdf")
assert [c.page_number for c in chunks] == [1, 2, 3]
def test_page_chunk_is_frozen():
chunk = PageChunk(page_number=1, text="hello", source="text_layer", word_count=1)
with pytest.raises(Exception):
chunk.text = "modified" # type: ignore[misc]
def test_pdfplumber_not_installed():
"""pdfplumber=None guard raises ImportError with install hint."""
import circuitforge_core.documents.pdf as pdf_mod
with patch.object(pdf_mod, "pdfplumber", None):
with pytest.raises(ImportError, match="pdfplumber"):
PDFExtractor().chunk_pages("/fake/book.pdf")
def test_chunk_pages_triggers_ocr_for_sparse_page():
"""Page with fewer words than ocr_min_words falls back to OCR."""
sparse_page = _mock_page("few words only") # 3 words < default 10
mock_image = MagicMock()
rendered = MagicMock()
rendered.original = mock_image
sparse_page.to_image.return_value = rendered
with (
patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl,
patch("circuitforge_core.documents.pdf.pytesseract") as mock_tess,
patch("circuitforge_core.documents.pdf.Image") as mock_pil,
):
mock_pl.open.return_value = _mock_pdf([sparse_page])
mock_pil.open.return_value = mock_image
mock_tess.image_to_string.return_value = (
"Full OCR extracted rulebook text about saving throws."
)
chunks = PDFExtractor(ocr_min_words=10).chunk_pages("/fake/scan.pdf")
assert chunks[0].source == "ocr"
assert "OCR extracted" in chunks[0].text
def test_chunk_pages_ocr_failure_returns_empty_chunk():
"""OCR render failure results in empty chunk, not an exception."""
sparse_page = _mock_page("")
sparse_page.to_image.side_effect = RuntimeError("render failed")
with patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl:
mock_pl.open.return_value = _mock_pdf([sparse_page])
chunks = PDFExtractor().chunk_pages("/fake/broken.pdf")
assert len(chunks) == 1
assert chunks[0].text == ""
assert chunks[0].source == "ocr"
assert chunks[0].word_count == 0
def test_chunk_pages_empty_pdf_returns_empty_list():
with patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl:
mock_pl.open.return_value = _mock_pdf([])
chunks = PDFExtractor().chunk_pages("/fake/empty.pdf")
assert chunks == []

View file

@ -11,69 +11,81 @@ def _make_router(config: dict) -> LLMRouter:
def test_complete_uses_first_reachable_backend():
router = _make_router({
"fallback_order": ["local"],
"backends": {
"local": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "llama3",
"supports_images": False,
}
router = _make_router(
{
"fallback_order": ["local"],
"backends": {
"local": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "llama3",
"supports_images": False,
}
},
}
})
)
mock_client = MagicMock()
mock_client.chat.completions.create.return_value = MagicMock(
choices=[MagicMock(message=MagicMock(content="hello"))]
)
with patch.object(router, "_is_reachable", return_value=True), \
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client):
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
result = router.complete("say hello")
assert result == "hello"
def test_complete_falls_back_on_unreachable_backend():
router = _make_router({
"fallback_order": ["unreachable", "working"],
"backends": {
"unreachable": {
"type": "openai_compat",
"base_url": "http://nowhere:1/v1",
"model": "x",
"supports_images": False,
router = _make_router(
{
"fallback_order": ["unreachable", "working"],
"backends": {
"unreachable": {
"type": "openai_compat",
"base_url": "http://nowhere:1/v1",
"model": "x",
"supports_images": False,
},
"working": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "llama3",
"supports_images": False,
},
},
"working": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "llama3",
"supports_images": False,
}
}
})
)
mock_client = MagicMock()
mock_client.chat.completions.create.return_value = MagicMock(
choices=[MagicMock(message=MagicMock(content="fallback"))]
)
def reachable(url):
return "nowhere" not in url
with patch.object(router, "_is_reachable", side_effect=reachable), \
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client):
with (
patch.object(router, "_is_reachable", side_effect=reachable),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
result = router.complete("test")
assert result == "fallback"
def test_complete_raises_when_all_backends_exhausted():
router = _make_router({
"fallback_order": ["dead"],
"backends": {
"dead": {
"type": "openai_compat",
"base_url": "http://nowhere:1/v1",
"model": "x",
"supports_images": False,
}
router = _make_router(
{
"fallback_order": ["dead"],
"backends": {
"dead": {
"type": "openai_compat",
"base_url": "http://nowhere:1/v1",
"model": "x",
"supports_images": False,
}
},
}
})
)
with patch.object(router, "_is_reachable", return_value=False):
with pytest.raises(RuntimeError, match="exhausted"):
router.complete("test")
@ -83,6 +95,126 @@ def test_try_cf_orch_alloc_import_path():
"""Verify lazy import points to circuitforge_orch, not circuitforge_core.resources."""
import inspect
from circuitforge_core.llm import router as router_module
src = inspect.getsource(router_module.LLMRouter._try_cf_orch_alloc)
assert "circuitforge_orch.client" in src
assert "circuitforge_core.resources.client" not in src
def test_embed_returns_vectors_from_openai_compat_backend():
router = _make_router(
{
"fallback_order": ["ollama"],
"backends": {
"ollama": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "mistral:7b",
"embedding_model": "nomic-embed-text",
"supports_images": False,
}
},
}
)
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[
MagicMock(embedding=[0.1, 0.2, 0.3]),
MagicMock(embedding=[0.4, 0.5, 0.6]),
]
)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
result = router.embed(["hello world", "fireball rules"])
assert result == [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]
mock_client.embeddings.create.assert_called_once_with(
model="nomic-embed-text",
input=["hello world", "fireball rules"],
)
def test_embed_uses_chat_model_when_no_embedding_model_configured():
router = _make_router(
{
"fallback_order": ["ollama"],
"backends": {
"ollama": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "llama3",
"supports_images": False,
}
},
}
)
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[MagicMock(embedding=[0.9, 0.8])]
)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
router.embed(["test"])
call_kwargs = mock_client.embeddings.create.call_args
assert call_kwargs.kwargs["model"] == "llama3"
def test_embed_skips_non_openai_compat_backends():
router = _make_router(
{
"fallback_order": ["anthropic", "ollama"],
"backends": {
"anthropic": {
"type": "anthropic",
"enabled": True,
"model": "claude-haiku-4-5-20251001",
"api_key_env": "ANTHROPIC_API_KEY",
"supports_images": True,
},
"ollama": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "nomic-embed-text",
"supports_images": False,
},
},
}
)
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[MagicMock(embedding=[0.1])]
)
mock_openai = MagicMock(return_value=mock_client)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.OpenAI", mock_openai),
):
result = router.embed(["hello"])
assert result == [[0.1]]
# Only ollama reached the OpenAI constructor; anthropic was skipped by type check
mock_openai.assert_called_once()
def test_embed_raises_when_all_backends_exhausted():
router = _make_router(
{
"fallback_order": ["dead"],
"backends": {
"dead": {
"type": "openai_compat",
"base_url": "http://nowhere:1/v1",
"model": "x",
"supports_images": False,
}
},
}
)
with patch.object(router, "_is_reachable", return_value=False):
with pytest.raises(RuntimeError, match="exhausted"):
router.embed(["test"])

View file

View file

@ -0,0 +1,102 @@
"""Tests for VectorStore ABC and VectorMatch."""
from __future__ import annotations
from dataclasses import FrozenInstanceError
import pytest
from circuitforge_core.vector.base import VectorMatch, VectorStore
class _ConcreteStore(VectorStore):
"""Minimal in-memory implementation for testing the ABC contract."""
def __init__(self) -> None:
self._data: dict[str, tuple[list[float], dict]] = {}
def upsert(self, entry_id: str, vector: list[float], metadata: dict) -> None:
self._data[entry_id] = (vector, metadata)
def query(
self,
vector: list[float],
top_k: int = 10,
filter_metadata: dict | None = None,
) -> list[VectorMatch]:
results = [
VectorMatch(entry_id=k, score=0.0, metadata=v[1])
for k, v in self._data.items()
]
if filter_metadata:
results = [
r
for r in results
if all(r.metadata.get(k) == val for k, val in filter_metadata.items())
]
return results[:top_k]
def delete(self, entry_id: str) -> None:
self._data.pop(entry_id, None)
def delete_where(self, filter_metadata: dict) -> int:
to_remove = [
k
for k, (_, meta) in self._data.items()
if all(meta.get(fk) == fv for fk, fv in filter_metadata.items())
]
for k in to_remove:
del self._data[k]
return len(to_remove)
def test_vector_match_is_frozen():
match = VectorMatch(entry_id="a", score=0.1, metadata={})
with pytest.raises(FrozenInstanceError):
match.score = 0.5 # type: ignore[misc]
def test_vector_match_metadata_is_dict():
match = VectorMatch(entry_id="a", score=0.1, metadata={"k": "v"})
assert isinstance(match.metadata, dict)
assert match.metadata["k"] == "v"
def test_upsert_and_query():
store = _ConcreteStore()
store.upsert("chunk-1", [0.1, 0.2], {"doc_id": "book-a", "page": 1})
results = store.query([0.1, 0.2])
assert len(results) == 1
assert results[0].entry_id == "chunk-1"
assert results[0].metadata["page"] == 1
def test_query_filter_metadata():
store = _ConcreteStore()
store.upsert("c1", [0.1], {"doc_id": "book-a"})
store.upsert("c2", [0.2], {"doc_id": "book-b"})
results = store.query([0.1], filter_metadata={"doc_id": "book-a"})
assert len(results) == 1
assert results[0].entry_id == "c1"
def test_delete():
store = _ConcreteStore()
store.upsert("x", [0.1], {})
store.delete("x")
assert store.query([0.1]) == []
def test_delete_where():
store = _ConcreteStore()
store.upsert("c1", [0.1], {"doc_id": "book-a"})
store.upsert("c2", [0.2], {"doc_id": "book-a"})
store.upsert("c3", [0.3], {"doc_id": "book-b"})
count = store.delete_where({"doc_id": "book-a"})
assert count == 2
assert len(store.query([0.1])) == 1
def test_cannot_instantiate_abc_directly():
with pytest.raises(TypeError):
VectorStore() # type: ignore[abstract]

View file

@ -0,0 +1,82 @@
# tests/test_vector/test_sqlite_vec.py
"""Integration tests for LocalSQLiteVecStore (uses a real in-memory sqlite-vec DB)."""
from __future__ import annotations
import pytest
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore
DIMS = 4 # small dimension for tests
@pytest.fixture
def store(tmp_path) -> LocalSQLiteVecStore:
return LocalSQLiteVecStore(db_path=tmp_path / "vecs.db", dimensions=DIMS)
def _vec(val: float) -> list[float]:
return [val] * DIMS
def test_upsert_and_query_returns_match(store):
store.upsert("doc-1::p1", _vec(0.1), {"doc_id": "doc-1", "page": 1})
results = store.query(_vec(0.1), top_k=5)
assert len(results) == 1
assert results[0].entry_id == "doc-1::p1"
assert results[0].metadata["page"] == 1
def test_upsert_replaces_existing(store):
store.upsert("chunk-1", _vec(0.1), {"page": 1})
store.upsert("chunk-1", _vec(0.9), {"page": 99})
# Metadata check
results = store.query(_vec(0.9), top_k=5)
assert results[0].metadata["page"] == 99
# Vector check: querying with new vector should score better than querying with old
old_results = store.query(_vec(0.1), top_k=5)
new_results = store.query(_vec(0.9), top_k=5)
assert new_results[0].score < old_results[0].score
def test_query_respects_top_k(store):
for i in range(5):
store.upsert(f"chunk-{i}", _vec(float(i) * 0.1), {"i": i})
results = store.query(_vec(0.0), top_k=2)
assert len(results) == 2
def test_filter_metadata(store):
store.upsert("c1", _vec(0.1), {"doc_id": "book-a"})
store.upsert("c2", _vec(0.2), {"doc_id": "book-b"})
results = store.query(_vec(0.1), filter_metadata={"doc_id": "book-a"})
assert all(r.metadata["doc_id"] == "book-a" for r in results)
def test_delete(store):
store.upsert("x", _vec(0.5), {})
store.delete("x")
assert store.query(_vec(0.5)) == []
def test_delete_where(store):
store.upsert("c1", _vec(0.1), {"doc_id": "book-a"})
store.upsert("c2", _vec(0.2), {"doc_id": "book-a"})
store.upsert("c3", _vec(0.3), {"doc_id": "book-b"})
count = store.delete_where({"doc_id": "book-a"})
assert count == 2
assert len(store.query(_vec(0.1))) == 1
def test_delete_nonexistent_is_noop(store):
store.delete("does-not-exist") # should not raise
def test_empty_query_returns_empty(store):
assert store.query(_vec(0.1)) == []
def test_delete_where_raises_on_empty_filter(store):
store.upsert("c1", _vec(0.1), {"doc_id": "book-a"})
with pytest.raises(ValueError, match="empty"):
store.delete_where({})