Compare commits

...

32 commits

Author SHA1 Message Date
5a363f3b6c fix(video): add torchvision to video-marlin extras
Some checks are pending
CI / test (push) Waiting to run
Mirror / mirror (push) Waiting to run
Missing from initial extras list — required by QwenVLVideoProcessor
at inference time. On CUDA 13 nodes must be installed from the PyTorch
nightly cu130 index to avoid a torch version downgrade:
  pip install --index-url https://download.pytorch.org/whl/nightly/cu130 torch torchvision

Discovered during Muninn deployment (2026-05-26).
2026-06-02 20:32:03 -07:00
a7d916f630 docs: add LLM development disclosure to README
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
Humans own design, architecture, code review, testing, and
verification. LLMs are part of our development workflow.
Links to circuitforge.tech/positions for our full position.
2026-05-28 08:20:17 -07:00
c2ac55259d fix(video): enforce PCI_BUS_ID order + force CUDA_VISIBLE_DEVICES assignment
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
CUDA defaults to FASTEST_FIRST device ordering, which does not match
nvidia-smi's PCI bus order on multi-GPU nodes. On Muninn, the RTX 3090
is cuda:0 and the Quadro RTX 4000 is cuda:1 — the opposite of nvidia-smi.

Two fixes:
1. Set CUDA_DEVICE_ORDER=PCI_BUS_ID so --gpu-id always matches nvidia-smi
   and the muninn.yaml profile GPU index assignments.
2. Use direct assignment (os.environ[...] = ...) instead of setdefault —
   setdefault silently no-ops if CUDA_VISIBLE_DEVICES is already present
   in the environment (conda activation, prior run, system default).
2026-05-26 15:07:30 -07:00
9f7fb45071 feat(video): add cf-video module — Marlin-2B FastAPI service + mock backend + tests
Some checks are pending
CI / test (push) Waiting to run
Mirror / mirror (push) Waiting to run
Add the circuitforge_core.video package implementing the cf-video inference
service managed by cf-orch.

Service endpoints:
  GET  /health     — liveness check; model name + VRAM
  POST /caption    — dense scene description + timestamped event list
  POST /find       — temporal grounding of a natural-language event query

Backend hierarchy:
  VideoBackend (Protocol)
    MarlinBackend  — NemoStation/Marlin-2B via transformers>=5.7.0
    MockVideoBackend — deterministic stub; no GPU required

Pydantic request/response models enforce parameter bounds at the API
boundary (max_new_tokens ge/le, event min_length=1).  Span is serialized
as list[float] | None for JSON compatibility.

MarlinBackend loads eagerly in __init__ so cf-orch's 2-second liveness
poll catches load failures immediately.  FORCE_QWENVL_VIDEO_READER env var
defaults to torchcodec (faster than av path) before transformers import.

pyproject.toml extras:
  video-marlin   — torch, transformers, torchcodec, qwen-vl-utils, av, Pillow
  video-service  — video-marlin + fastapi + uvicorn

Test coverage: 46 tests across test_mock_backend.py and test_app.py.
All passing without GPU or real video file.

Closes: #71
2026-05-25 20:00:37 -07:00
93d36346c1 feat(llm): task-based cf-orch allocation in LLMRouter (v0.21.0)
_try_cf_orch_alloc now checks for cf_orch.task + cf_orch.product keys.
When present, uses client.task_allocate(product, task) instead of
service-based allocate(). Supports peregrine#115 task-model routing.
Existing service-based configs are unaffected.
2026-05-17 19:59:48 -07:00
af66877b51 feat(community): recipe dedup support — similar_to_ref FK, search_similar_posts, migration 006
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
Adds three-layer dedup infrastructure for community recipe posts:
- Migration 006: similar_to_ref self-FK, title lower() index, recipe_id index
- CommunityPost.similar_to_ref optional field (frozen dataclass, defaults None)
- SharedStore.search_similar_posts(): title ILIKE + recipe_id match, ordered by relevance
- insert_post() wires similar_to_ref into the INSERT
2026-05-11 17:09:18 -07:00
41c9830281 docs(readme): landing page rewrite — v0.20.0, all 28 modules, LLM router + DB usage examples, full extras install table, used-by table
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
2026-05-06 08:51:54 -07:00
fb3a4c697d feat(llm): v0.20.0 — LLMRouter dict init + Ollama embed preflight (closes #59, #60)
Some checks failed
CI / test (push) Waiting to run
Mirror / mirror (push) Has been cancelled
Release — PyPI / release (push) Has been cancelled
- LLMRouter.__init__ now accepts a Path | dict; pagepiper ingest scripts
  pass a runtime-constructed config dict instead of a temp file
- _check_ollama_model_pulled() preflight on embed(): checks /api/tags once
  per backend URL and raises RuntimeError("...Fix: ollama pull <model>")
  when the configured embedding model is not pulled; silently skips for
  non-Ollama backends (vLLM, etc.) that don't expose /api/tags
- 6 new tests: dict init paths (x2) + preflight scenarios (x4)
- Existing embed tests updated to mock requests.get to avoid live Ollama calls
2026-05-05 14:59:49 -07:00
ccc6a15d94 feat: cf-core v0.19.0 — add PDF extraction, VectorStore, LLMRouter.embed()
Some checks failed
CI / test (push) Waiting to run
Mirror / mirror (push) Has been cancelled
Release — PyPI / release (push) Has been cancelled
2026-05-04 16:11:57 -07:00
0ddb3cbf07 chore: bump cf-core to v0.19.0 (add pdf, vector, llm.embed) 2026-05-04 16:04:48 -07:00
7526092481 fix(llm): strengthen embed skip-verification test; add DEMO_MODE check to embed() 2026-05-04 16:02:26 -07:00
8e2d15bcd4 feat(llm): add LLMRouter.embed() for batch embedding generation
Adds embed(texts, model_override, fallback_order) to LLMRouter. Only
openai_compat backends are tried (Ollama/vLLM expose /v1/embeddings;
anthropic and vision_service do not). Uses embedding_model from backend
config when present, falls back to the chat model otherwise. Supports
cf-orch allocation and raises RuntimeError when all backends are exhausted.

4 tests added (TDD: RED → GREEN), 763 total passing, no regressions.
2026-05-04 15:58:44 -07:00
a6d906bcbb fix(vector): explicit rollback, table identifier guard, query scope fix 2026-05-04 15:55:05 -07:00
0489f1111c feat(vector): add LocalSQLiteVecStore backed by sqlite-vec
Implements the VectorStore ABC using sqlite-vec virtual tables.
Two-table design (vec0 virtual + companion meta) supports upsert,
top-k ANN query with optional metadata post-filter, delete by ID,
and bulk delete_where. Also renames VectorMatch.id → entry_id to
avoid shadowing the Python builtin, updating base.py and all tests.

Installed: sqlite-vec 0.1.9
Tests: 16 passed (7 base + 9 integration)
2026-05-04 15:41:39 -07:00
e6c69f25ae fix(vector): rename VectorMatch.entry_id to id per downstream contract
VectorMatch.entry_id renamed to VectorMatch.id to match the API contract
expected by downstream consumers (pagepiper T7). The dataclass remains frozen
to prevent field reassignment; metadata is kept as plain dict for JSON
deserialization compatibility.

- Renamed VectorMatch.entry_id field to id
- Updated all test references to use .id accessor
- Simplified metadata to plain dict (removed MappingProxyType wrapping)
- All 7 tests passing
2026-05-04 14:19:14 -07:00
9492942623 fix(vector): make VectorMatch.metadata immutable; rename id to entry_id 2026-05-04 11:46:24 -07:00
fe51914902 feat(vector): add VectorStore ABC and VectorMatch dataclass 2026-05-04 11:42:03 -07:00
ac45067ae7 test(documents): add OCR fallback and edge case tests for PDFExtractor 2026-05-04 08:45:53 -07:00
408ab64c55 test(documents): add OCR and ImportError coverage for PDFExtractor
- Add module-level guards for pytesseract and PIL.Image (enables patching in tests)
- Move `import io` from inside _ocr_page to module-level stdlib imports
- Extract _ensure_pil_image() helper with TypeError guard so isinstance check
  does not blow up when Image is patched to a MagicMock in tests
- Add 3 new tests: pdfplumber=None ImportError, sparse-page OCR fallback,
  OCR render failure returns empty chunk
- Coverage: 96% (up from 64%)
2026-05-04 08:39:31 -07:00
bbb146b361 feat(documents): add PDFExtractor text-layer extraction and PageChunk
Adds circuitforge_core/documents/pdf.py with:
- PageChunk frozen dataclass (page_number, text, source, word_count)
- PDFExtractor.chunk_pages() — pdfplumber text-layer per page, OCR fallback via pytesseract for sparse pages
- Module-level graceful ImportError guard on pdfplumber (patchable, follows cf-core optional-extra pattern)
- pdf and pdf-ocr optional extras declared in pyproject.toml

3 tests, all passing.
2026-05-04 08:33:10 -07:00
3be21ce452 chore: gitignore .worktrees directory 2026-05-04 08:23:39 -07:00
73f694ed3a fix(input/gestures): restore Iterator[np.ndarray] return type on frames() 2026-04-26 20:48:50 -07:00
0f5ea86ab0 fix(input/gestures): enforce numpy array immutability in HandLandmarks; add CameraCapture tests
- Set points.flags.writeable = False in HandsDetector.detect() so in-place
  mutation of HandLandmarks.points raises ValueError (frozen=True alone does not
  protect numpy array contents)
- Extend test_handlandmarks_is_immutable to assert ValueError on array mutation
- Add test_camera.py with 3 tests covering is_open, frames() yield/break
  behaviour, and context manager release (was at 0% coverage)
- Remove unused `import numpy as np` from camera.py; fix frames() return
  annotation to Iterator (np.ndarray ref removed with the import)
2026-04-26 20:48:02 -07:00
cb3d186a58 chore: bump cf-core to v0.18.0 — adds cf_input.gestures module
Some checks failed
Mirror / mirror (push) Has been cancelled
Release — PyPI / release (push) Has been cancelled
2026-04-26 20:20:28 -07:00
a62bff5f1e test(input/gestures): add full pipeline smoke test 2026-04-26 20:18:40 -07:00
524cc62812 feat(input/gestures): add CameraCapture and public __init__ exports 2026-04-26 20:16:18 -07:00
a31e6099c6 feat(input/gestures): implement HandsDetector wrapping mediapipe Hands 2026-04-26 20:08:05 -07:00
5a4917d455 style: black format normalizer.py and test_normalizer.py 2026-04-26 20:05:54 -07:00
460530bb03 feat(input/gestures): implement normalize_hand() with scale/translation invariance 2026-04-26 19:58:00 -07:00
b2b58913c7 feat: scaffold cf_input.gestures module + gestures-mediapipe dep group 2026-04-26 18:51:45 -07:00
185057d8ca feat(reranker): full adapter suite + cf-orch auto-routing (closes #54)
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
Release — PyPI / release (push) Has been cancelled
Five backends: BGE (FlagEmbedding), Qwen3 (generative yes/no logit scorer,
batched forward pass), CrossEncoder (sentence-transformers, covers mxbai-rerank
/ ms-marco / jina), Cohere (BYOK cloud), Remote (HTTP delegate to cf-reranker
service). Mock adapter for tests. 54 tests.

cf-reranker FastAPI service app (port 8011) — cf-orch manages as a process,
defaults to Qwen3-Reranker-0.6B.

make_reranker() auto-detects CF_ORCH_URL and routes to cf-orch cf-reranker
when set — cloud apps (Kiwi, Peregrine, Snipe) get remote Qwen3 reranking
with zero code changes. Local dev falls back to local BGE.

pyproject extras: reranker-bge, reranker-qwen3, reranker-cross-encoder,
reranker-cohere, reranker-service.
2026-04-26 09:04:39 -07:00
b21d6acc8e feat: add detect_byok() public utility to cloud_session, bump v0.16.1
Some checks failed
CI / test (push) Waiting to run
Mirror / mirror (push) Has been cancelled
Release — PyPI / release (push) Has been cancelled
Extracted from kiwi/avocet where it was duplicated. Reads llm.yaml via
the same path LLMRouter uses — products can now import detect_byok from
cf-core instead of maintaining their own copy.
2026-04-25 16:01:05 -07:00
48 changed files with 4094 additions and 97 deletions

1
.gitignore vendored
View file

@ -11,3 +11,4 @@ build/
# cf-orch private profiles (commit on personal/heimdall branch only)
circuitforge_core/resources/profiles/private/
.worktrees/

View file

@ -6,6 +6,49 @@ Versions follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
---
## [0.20.0] — 2026-05-05
### Fixed / Enhanced
**`circuitforge_core.llm.LLMRouter`** — Pagepiper-driven improvements (closes #59, #60)
- **#59 — dict init** (`LLMRouter(config_path: Path | dict)`): `__init__` now accepts an inline config dict in addition to a `Path`. Ingest scripts that construct Ollama URLs from product-specific env vars (e.g. `PAGEPIPER_OLLAMA_URL`) can pass the dict directly without writing a temp file. Passing a dict previously raised `AttributeError: 'dict' object has no attribute 'exists'`. Tests: `test_init_accepts_inline_dict`, `test_init_dict_is_used_directly`.
- **#60 — Ollama preflight** (`_check_ollama_model_pulled()`): Before the first `embed()` call on an Ollama backend, `GET /api/tags` is checked to verify the configured embedding model is pulled. If it is not, a `RuntimeError` with an actionable `ollama pull <model>` hint is raised immediately — replacing the opaque `All LLM backends exhausted for embed()` error. Results are cached per base URL for the router's lifetime (one HTTP call, not one per `embed()` invocation). Non-Ollama backends (vLLM, etc.) don't expose `/api/tags` — a non-200 response causes the check to be silently skipped. Tests: `test_embed_raises_actionable_error_when_model_not_pulled`, `test_embed_proceeds_when_model_is_pulled`, `test_embed_skips_preflight_when_tags_endpoint_unavailable`, `test_ollama_tags_cache_is_hit_only_once`.
---
## [0.17.0] — 2026-04-27
### Added
**`circuitforge_core.reranker`** — shared reranker module for RAG pipelines across the orchard (MIT, closes #54)
Five adapters covering local and cloud paths:
- `adapters/bge.py``BGETextReranker`: FlagEmbedding cross-encoder (`BAAI/bge-reranker-*`). Batches all pairs in a single `compute_score()` call via `rerank_batch()`. Thread-safe with internal lock. Free tier.
- `adapters/qwen3.py``Qwen3TextReranker`: generative reranker using `AutoModelForCausalLM`. Scores by reading yes/no token logits at the last input position after pre-filling the assistant `<think>\n\n</think>` block — one forward pass per batch, no generation loop. Left-pads for consistent last-token position across batch. Free / Paid tier.
- `adapters/cross_encoder.py``CrossEncoderTextReranker`: sentence-transformers `CrossEncoder`. Broader model coverage: `mxbai-rerank-*`, `ms-marco-MiniLM-*`, `jina-reranker-*`. Free tier.
- `adapters/cohere.py``CohereTextReranker`: Cohere Rerank API (BYOK cloud path). Reads `COHERE_API_KEY` from env or explicit `api_key=` arg. Restores original candidate order from Cohere's score-sorted response. Paid / BYOK.
- `adapters/remote.py``RemoteTextReranker`: HTTP delegate to a cf-reranker service endpoint. `from_cf_orch()` classmethod allocates via cf-orch on demand. `release()` method returns the lease.
- `adapters/mock.py``MockTextReranker`: Jaccard-similarity scorer, no model required. Used in tests and `CF_RERANKER_MOCK=1` mode.
`app.py``cf-reranker` FastAPI service (port 8011). Managed by cf-orch as a process-type service. Exposes `GET /health` and `POST /rerank`. Defaults to `Qwen3-Reranker-0.6B`.
**Auto cf-orch routing:** `make_reranker()` checks `CF_ORCH_URL` at construction time. When set (cloud deployments), it automatically allocates a `cf-reranker` service via cf-orch and returns a `RemoteTextReranker` — no code changes needed in Kiwi, Peregrine, or Snipe. Local dev (no `CF_ORCH_URL`) falls back to local BGE inference.
**Public API:**
- `rerank(query, candidates, top_n)` — process-level singleton, mock-safe
- `make_reranker(model_id, backend, mock)` — explicit instance
- `reset_reranker()` — test teardown only
- `RerankResult(candidate, score, rank)` — frozen dataclass result type
**`pyproject.toml` extras:** `reranker-bge`, `reranker-qwen3`, `reranker-cross-encoder`, `reranker-cohere`, `reranker-service`
54 tests across all adapters.
---
## [0.14.0] — 2026-04-20
### Added

193
README.md
View file

@ -1,37 +1,186 @@
# circuitforge-core
<p align="center">
<img src="docs/cf-logo.png" alt="CircuitForge logo" width="120" />
</p>
Shared scaffold for CircuitForge products.
<h1 align="center">circuitforge-core</h1>
**Current version: 0.7.0**
<p align="center">Shared Python scaffold for privacy-first, self-hosted AI tools</p>
## Modules
<p align="center">
<a href="LICENSE"><img src="https://img.shields.io/badge/license-MIT-green.svg" alt="MIT License" /></a>
<img src="https://img.shields.io/badge/version-0.20.0-blue.svg" alt="v0.20.0" />
<img src="https://img.shields.io/badge/python-3.11%2B-blue.svg" alt="Python 3.11+" />
<a href="https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core"><img src="https://img.shields.io/badge/repo-Forgejo-orange.svg" alt="Forgejo" /></a>
</p>
### Implemented
---
- `circuitforge_core.db` — SQLite connection factory and migration runner
- `circuitforge_core.llm` — LLM router with fallback chain (Ollama, vLLM, Anthropic, OpenAI-compatible)
- `circuitforge_core.tiers` — Tier system with BYOK and local vision unlocks
- `circuitforge_core.config` — Env validation and .env loader
- `circuitforge_core.hardware` — Hardware detection and LLM backend profile generation (VRAM tiers, GPU/CPU auto-select)
- `circuitforge_core.documents` — Document ingestion pipeline: PDF, DOCX, and image OCR → `StructuredDocument`
- `circuitforge_core.affiliates` — Affiliate URL wrapping with opt-out, BYOK user IDs, and CF env-var fallback (`wrap_url`)
- `circuitforge_core.preferences` — User preference store (local YAML file, pluggable backend); dot-path get/set API
- `circuitforge_core.tasks` — VRAM-aware LLM task scheduler; shared slot manager across services (`TaskScheduler`)
- `circuitforge_core.manage` — Cross-platform product process manager (Docker and native modes)
- `circuitforge_core.resources` — Resource coordinator and agent: VRAM allocation, eviction engine, GPU profile registry
## Why circuitforge-core?
### Stubs (in-tree, not yet implemented)
- **Local inference first.** The LLM router defaults to Ollama on localhost. Cloud APIs are a configurable fallback, not the default path. No telemetry, no round-trips you didn't ask for.
- **VRAM-aware scheduling.** The task scheduler and resource coordinator track GPU memory across concurrent services, allocate slots before loading models, and evict backends gracefully when VRAM is scarce.
- **Consistent tier system across products.** One `tiers` module handles Free / Paid / Premium / Ultra tiers, BYOK (bring your own key) unlocks, and local-vision capability gates — the same way in every product.
- **Uniform developer experience.** DB migrations, config validation, document ingestion, process management, and preference storage all share a single, tested implementation. Products extend, not reimplement.
- `circuitforge_core.vision` — Vision router base class (planned: moondream2 / Claude vision dispatch)
- `circuitforge_core.wizard` — First-run wizard base class (products subclass `BaseWizard`)
- `circuitforge_core.pipeline` — Staging queue base (`StagingDB`; products provide concrete schema)
---
## Install
```bash
pip install -e .
# From PyPI
pip install circuitforge-core
# Editable install from source (recommended for product development)
pip install -e /path/to/circuitforge-core
# With optional extras
pip install circuitforge-core[pdf] # PDF/DOCX/OCR document ingestion
pip install circuitforge-core[vector] # SQLite-vec vector store
pip install circuitforge-core[text-transformers] # Local transformer inference (cf-text)
pip install circuitforge-core[stt-faster-whisper] # Speech-to-text via Faster Whisper
pip install circuitforge-core[tts-chatterbox] # Text-to-speech via Chatterbox
pip install circuitforge-core[reranker-qwen3] # Reranking via Qwen3
pip install circuitforge-core[community] # PostgreSQL-backed community store
pip install circuitforge-core[manage] # cf-manage CLI (Typer)
pip install circuitforge-core[dev] # All dev dependencies
```
---
## Modules
| Module | Status | Description |
|---|---|---|
| `db` | Implemented | SQLite connection factory and migration runner |
| `llm` | Implemented | LLM router with priority fallback chain (Ollama, vLLM, Anthropic, OpenAI-compatible) |
| `tiers` | Implemented | Tier system with BYOK and local-vision unlocks (Free / Paid / Premium / Ultra) |
| `config` | Implemented | Env validation and `.env` loader with startup fail-fast |
| `hardware` | Implemented | GPU/CPU detection, VRAM profiling, backend profile generation |
| `documents` | Implemented | PDF, DOCX, and image OCR ingestion into `StructuredDocument` |
| `affiliates` | Implemented | Affiliate URL wrapping with per-user opt-out and env-var fallback |
| `preferences` | Implemented | User preference store — local YAML with pluggable backend; dot-path get/set |
| `tasks` | Implemented | VRAM-aware LLM task scheduler; shared slot manager across services |
| `manage` | Implemented | Cross-platform product process manager (Docker and native modes) |
| `resources` | Implemented | VRAM allocation, eviction engine, GPU profile registry |
| `text` | Implemented | Text processing utilities and local transformer inference service |
| `activitypub` | Implemented | ActivityPub actor, inbox, delivery, and Lemmy federation primitives |
| `audio` | Implemented | Audio buffer, format conversion, resampling, and VAD (voice activity detection) gate |
| `stt` | Implemented | Speech-to-text service (Faster Whisper backend) |
| `tts` | Implemented | Text-to-speech service (Chatterbox backend) |
| `musicgen` | Implemented | Music generation service (AudioCraft/MusicGen backend) |
| `reranker` | Implemented | Result reranking — BGE, Qwen3, cross-encoder, and Cohere adapters |
| `vector` | Implemented | SQLite-vec vector store with pluggable embedding backend |
| `api` | Implemented | Shared API helpers — corrections and feedback endpoints |
| `community` | Implemented | Community feed and social store (PostgreSQL-backed) |
| `platforms` | Implemented | Platform-specific integrations (eBay) |
| `cloud_session` | Implemented | Cloud session management primitives |
| `input` | Implemented | Input handling — MediaPipe gesture recognition |
| `job_quality` | Implemented | Job listing quality scoring and signal extraction |
| `vision` | Stub | Vision router (moondream2 / SigLIP dispatch — planned) |
| `wizard` | Stub | First-run wizard base class — products subclass `BaseWizard` |
| `pipeline` | Stub | Staging queue base — products provide concrete schema |
---
## Usage: LLM Router
The LLM router reads a config file at `~/.config/circuitforge/llm.yaml`, tries each backend in fallback order, and skips unreachable or disabled entries transparently.
```python
from circuitforge_core.llm import LLMRouter
# Auto-detects from env vars when llm.yaml is absent:
# ANTHROPIC_API_KEY, OPENAI_API_KEY / OPENAI_BASE_URL, OLLAMA_HOST
router = LLMRouter()
response = router.complete(
messages=[{"role": "user", "content": "Summarize this in one sentence."}],
system="You are a concise assistant.",
)
print(response)
```
**Example `llm.yaml`** (Ollama local, Anthropic cloud fallback):
```yaml
fallback_order:
- ollama
- anthropic
backends:
ollama:
type: openai_compat
enabled: true
base_url: http://localhost:11434/v1
model: llama3.2:3b
anthropic:
type: anthropic
enabled: true
model: claude-haiku-4-5-20251001
api_key_env: ANTHROPIC_API_KEY
supports_images: true
```
---
## Usage: Database + Migrations
```python
from circuitforge_core.db import get_connection, run_migrations
from pathlib import Path
# Run product migrations on startup
run_migrations(db_path=Path("data/app.db"), migrations_dir=Path("db/migrations"))
# Get a connection anywhere in your app
with get_connection(Path("data/app.db")) as conn:
conn.execute("INSERT INTO items (name) VALUES (?)", ("example",))
```
---
## Used by
| Product | Description |
|---|---|
| [peregrine](https://git.opensourcesolarpunk.com/Circuit-Forge/peregrine) | Job search — discovery, cover letters, interview prep |
| [snipe](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe) | Auction sniping — eBay trust scoring, bid timing |
| [kiwi](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi) | Pantry tracker with barcode/receipt OCR and recipe suggestions |
| [avocet](https://git.opensourcesolarpunk.com/Circuit-Forge/avocet) | Email classifier training and benchmark harness |
| [osprey](https://git.opensourcesolarpunk.com/Circuit-Forge/osprey) | Government hold-line automation |
| [linnet](https://git.opensourcesolarpunk.com/Circuit-Forge/linnet) | Real-time tone annotation and voice transcription |
| pagepiper | PDF/rulebook RAG (retrieval-augmented generation) search |
---
## Contributing
circuitforge-core is MIT licensed. Contributions are welcome.
```bash
git clone https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core
cd circuitforge-core
pip install -e ".[dev]"
pytest
```
- New modules belong in `circuitforge_core/<module>/` as a package, not a flat file
- Keep modules focused — extract when a module exceeds 400 lines
- All public functions need type annotations
- Tests live in `tests/` — aim for 80% coverage on new code
- Use `ruff` for linting before submitting a PR
Open issues and PRs at: [git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core](https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core)
---
## License
BSL 1.1 — see LICENSE
MIT — see [LICENSE](LICENSE).
This is the fully open layer of the CircuitForge stack. Products built on top of circuitforge-core may carry different licenses (BSL 1.1 for AI features, proprietary for fine-tuned weights). The scaffold itself is and will remain MIT.
---
Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)

View file

@ -1,4 +1,4 @@
__version__ = "0.16.0"
__version__ = "0.18.0"
try:
from circuitforge_core.community import CommunityDB, CommunityPost, SharedStore

View file

@ -312,3 +312,29 @@ class CloudSessionFactory:
return session
return _check
# ── BYOK detection ────────────────────────────────────────────────────────────
def detect_byok(config_path: Path | None = None) -> bool:
"""Return True if at least one enabled non-vision LLM backend is configured.
Reads the shared llm.yaml that LLMRouter uses. Local (Ollama, vLLM) and
API-key backends both count the policy is "user is supplying compute",
regardless of where that compute lives.
Args:
config_path: Override the default config location. Useful in tests.
"""
import yaml
if config_path is None:
config_path = Path.home() / ".config" / "circuitforge" / "llm.yaml"
try:
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
return any(
b.get("enabled", True) and b.get("type") != "vision_service"
for b in cfg.get("backends", {}).values()
)
except Exception:
return False

View file

@ -0,0 +1,22 @@
-- 006_community_dedup.sql
-- Adds variation-linking and title search support for community recipe dedup.
-- Applies to: cf_community PostgreSQL database.
-- BSL boundary: MIT (data layer, no inference).
-- Nullable self-referential FK: user-declared "this is a variation of X"
ALTER TABLE community_posts
ADD COLUMN IF NOT EXISTS similar_to_ref TEXT REFERENCES community_posts(slug) ON DELETE SET NULL;
-- Index for variation lookup (find all variations of a parent post)
CREATE INDEX IF NOT EXISTS idx_community_posts_similar_ref
ON community_posts (similar_to_ref)
WHERE similar_to_ref IS NOT NULL;
-- Index to speed up title ILIKE prefix and substring searches
CREATE INDEX IF NOT EXISTS idx_community_posts_title_lower
ON community_posts (lower(title));
-- Index on recipe_id for exact-recipe duplicate detection
CREATE INDEX IF NOT EXISTS idx_community_posts_recipe_id
ON community_posts (recipe_id)
WHERE recipe_id IS NOT NULL;

View file

@ -66,6 +66,9 @@ class CommunityPost:
protein_pct: float | None
moisture_pct: float | None
# Variation link: slug of the parent post this is explicitly a variation of
similar_to_ref: str | None = None
def __post_init__(self) -> None:
# Coerce list fields to tuples (frozen dataclass: use object.__setattr__)
for key in ("slots", "dietary_tags", "allergen_flags", "flavor_molecules"):

View file

@ -46,6 +46,7 @@ def _row_to_post(row: dict) -> CommunityPost:
fat_pct=row.get("fat_pct"),
protein_pct=row.get("protein_pct"),
moisture_pct=row.get("moisture_pct"),
similar_to_ref=row.get("similar_to_ref"),
)
@ -137,6 +138,61 @@ class SharedStore:
finally:
self._db.putconn(conn)
def search_similar_posts(
self,
title: str,
recipe_id: int | None = None,
post_type: str | None = None,
limit: int = 8,
) -> list[CommunityPost]:
"""Return posts similar to the given title or with the same recipe_id.
Used by the dedup check before a new post is submitted. Matches on:
- exact recipe_id (strongest signal)
- case-insensitive title substring match
Results are ordered: recipe_id matches first, then by published desc.
"""
conn = self._db.getconn()
try:
conditions: list[str] = []
params: list = []
title_condition = "lower(title) LIKE lower(%s)"
title_param = f"%{title.lower()[:80]}%"
if recipe_id is not None:
conditions.append(f"(recipe_id = %s OR {title_condition})")
params.extend([recipe_id, title_param])
else:
conditions.append(title_condition)
params.append(title_param)
if post_type:
conditions.append("post_type = %s")
params.append(post_type)
where = "WHERE " + " AND ".join(conditions)
params.append(limit)
order_clause = (
"ORDER BY (recipe_id = %s) DESC, published DESC"
if recipe_id is not None
else "ORDER BY published DESC"
)
if recipe_id is not None:
params.insert(-1, recipe_id)
with conn.cursor() as cur:
cur.execute(
f"SELECT * FROM community_posts {where} {order_clause} LIMIT %s",
params,
)
rows = cur.fetchall()
return [_row_to_post(_cursor_to_dict(cur, r)) for r in rows]
finally:
self._db.putconn(conn)
# ------------------------------------------------------------------
# Writes
# ------------------------------------------------------------------
@ -156,13 +212,14 @@ class SharedStore:
seasoning_score, richness_score, brightness_score,
depth_score, aroma_score, structure_score, texture_profile,
dietary_tags, allergen_flags, flavor_molecules,
fat_pct, protein_pct, moisture_pct, source_product
fat_pct, protein_pct, moisture_pct, source_product,
similar_to_ref
) VALUES (
%s, %s, %s, %s, %s, %s, %s,
%s::jsonb, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s,
%s::jsonb, %s::jsonb, %s::jsonb,
%s, %s, %s, %s
%s, %s, %s, %s, %s
)
""",
(
@ -178,6 +235,7 @@ class SharedStore:
json.dumps(list(post.flavor_molecules)),
post.fat_pct, post.protein_pct, post.moisture_pct,
self._source_product,
post.similar_to_ref,
),
)
conn.commit()

View file

@ -0,0 +1,133 @@
# circuitforge_core/documents/pdf.py
"""
circuitforge_core.documents.pdf PDF text extraction and page-level chunking.
Primary path: pdfplumber (selectable text layers).
Fallback: pytesseract OCR (scanned / image-only pages).
Usage::
from circuitforge_core.documents.pdf import PDFExtractor
chunks = PDFExtractor().chunk_pages("/path/to/book.pdf")
for chunk in chunks:
print(f"[p.{chunk.page_number}] ({chunk.source}) {chunk.text[:80]}")
"""
from __future__ import annotations
import io
import logging
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
try:
import pdfplumber
except ImportError: # pragma: no cover
pdfplumber = None # type: ignore[assignment]
try:
import pytesseract
except ImportError: # pragma: no cover
pytesseract = None # type: ignore[assignment]
try:
from PIL import Image
except ImportError: # pragma: no cover
Image = None # type: ignore[assignment]
@dataclass(frozen=True)
class PageChunk:
"""Text content extracted from a single PDF page."""
page_number: int # 1-indexed
text: str
source: str # "text_layer" | "ocr"
word_count: int
class PDFExtractor:
"""
Extract page-level text chunks from PDF files.
Args:
ocr_min_words: Pages with fewer words from the text layer trigger OCR.
"""
def __init__(self, ocr_min_words: int = 10) -> None:
self.ocr_min_words = ocr_min_words
def chunk_pages(self, pdf_path: str | Path) -> list[PageChunk]:
"""
Primary entry point. Returns one PageChunk per page.
Uses text-layer extraction per page; falls back to OCR when text is sparse.
Empty PDFs return an empty list.
"""
if pdfplumber is None:
raise ImportError(
"pdfplumber is required for PDF extraction. "
"Install it with: pip install pdfplumber"
)
path = Path(pdf_path)
chunks: list[PageChunk] = []
with pdfplumber.open(path) as pdf:
for i, page in enumerate(pdf.pages, start=1):
text = page.extract_text() or ""
words = text.split()
if len(words) >= self.ocr_min_words:
chunks.append(
PageChunk(
page_number=i,
text=text.strip(),
source="text_layer",
word_count=len(words),
)
)
else:
logger.debug(
"pdf: page %d sparse (%d words), falling back to OCR",
i,
len(words),
)
chunks.append(self._ocr_page(page, i))
return chunks
def _ocr_page(self, page: object, page_number: int) -> PageChunk:
"""Render page to image and extract text via tesseract."""
try:
rendered = page.to_image(resolution=200).original # type: ignore[attr-defined]
rendered = _ensure_pil_image(rendered)
text = pytesseract.image_to_string(rendered) # type: ignore[union-attr]
words = text.split()
return PageChunk(
page_number=page_number,
text=text.strip(),
source="ocr",
word_count=len(words),
)
except Exception as exc:
logger.warning("pdf: OCR failed for page %d: %s", page_number, exc)
return PageChunk(
page_number=page_number, text="", source="ocr", word_count=0
)
def _ensure_pil_image(rendered: object) -> object:
"""Return *rendered* as a PIL Image, converting from bytes if needed."""
if Image is None:
return rendered
try:
if not isinstance(rendered, Image.Image):
rendered = Image.open(io.BytesIO(rendered)) # type: ignore[arg-type]
except TypeError:
# Image may be patched (e.g. in tests); skip the conversion.
pass
return rendered

View file

View file

@ -0,0 +1,15 @@
"""
cf_input.gestures camera capture, hand detection, landmark normalization.
Public API:
CameraCapture OpenCV frame source
HandsDetector MediaPipe Hands wrapper
HandLandmarks immutable detected hand dataclass
normalize_hand() scale/translation-invariant feature vector
"""
from circuitforge_core.input.gestures.camera import CameraCapture
from circuitforge_core.input.gestures.hands import HandLandmarks, HandsDetector
from circuitforge_core.input.gestures.normalizer import normalize_hand
__all__ = ["CameraCapture", "HandLandmarks", "HandsDetector", "normalize_hand"]

View file

@ -0,0 +1,57 @@
"""
OpenCV camera capture context manager wrapping VideoCapture.
Yields BGR frames. Callers convert to RGB before passing to HandsDetector:
frame_rgb = frame_bgr[:, :, ::-1]
"""
from __future__ import annotations
from typing import Iterator
import cv2
import numpy as np
class CameraCapture:
"""
Thin wrapper around cv2.VideoCapture.
Usage:
with CameraCapture(device_index=0) as cam:
for frame_bgr in cam.frames():
process(frame_bgr)
"""
def __init__(
self,
device_index: int = 0,
width: int = 640,
height: int = 480,
fps: int = 30,
) -> None:
self._cap = cv2.VideoCapture(device_index)
self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self._cap.set(cv2.CAP_PROP_FPS, fps)
@property
def is_open(self) -> bool:
return self._cap.isOpened()
def frames(self) -> Iterator[np.ndarray]:
"""Yield BGR uint8 frames until camera fails or caller breaks."""
while self._cap.isOpened():
ok, frame = self._cap.read()
if not ok:
break
yield frame
def release(self) -> None:
self._cap.release()
def __enter__(self) -> CameraCapture:
return self
def __exit__(self, *_: object) -> None:
self.release()

View file

@ -0,0 +1,91 @@
"""
MediaPipe Hands wrapper.
Produces immutable HandLandmarks dataclasses from RGB video frames.
The caller is responsible for BGRRGB conversion before passing frames.
"""
from __future__ import annotations
from dataclasses import dataclass
import mediapipe as mp
import numpy as np
@dataclass(frozen=True)
class HandLandmarks:
"""Immutable snapshot of one detected hand."""
points: np.ndarray # shape (21, 3) — x, y, z in [0,1] normalized image space
handedness: str # 'Left' | 'Right' (mirror of physical hand)
confidence: float # [0.0, 1.0]
class HandsDetector:
"""
Thin wrapper around mediapipe.solutions.hands.Hands.
Usage:
detector = HandsDetector()
for frame_bgr in camera.frames():
frame_rgb = frame_bgr[:, :, ::-1]
hands = detector.detect(frame_rgb)
for hand in hands:
vec = normalize_hand(hand.points)
...
detector.close()
Or use as a context manager:
with HandsDetector() as detector:
...
"""
def __init__(
self,
max_hands: int = 2,
min_detection_confidence: float = 0.7,
min_tracking_confidence: float = 0.5,
) -> None:
self._hands = mp.solutions.hands.Hands(
static_image_mode=False,
max_num_hands=max_hands,
min_detection_confidence=min_detection_confidence,
min_tracking_confidence=min_tracking_confidence,
)
def detect(self, rgb_frame: np.ndarray) -> list[HandLandmarks]:
"""
Run hand detection on one RGB frame.
Args:
rgb_frame: (H, W, 3) uint8 RGB image.
Returns:
List of HandLandmarks, one per detected hand (up to max_hands).
Empty list if no hands detected.
"""
results = self._hands.process(rgb_frame)
if not results.multi_hand_landmarks:
return []
out: list[HandLandmarks] = []
for lm, hand in zip(results.multi_hand_landmarks, results.multi_handedness):
points = np.array([[p.x, p.y, p.z] for p in lm.landmark], dtype=np.float32)
points.flags.writeable = False # enforce immutability of stored array
out.append(
HandLandmarks(
points=points,
handedness=hand.classification[0].label,
confidence=float(hand.classification[0].score),
)
)
return out
def close(self) -> None:
self._hands.close()
def __enter__(self) -> HandsDetector:
return self
def __exit__(self, *_: object) -> None:
self.close()

View file

@ -0,0 +1,33 @@
"""
Landmark normalization for MediaPipe hand landmarks.
Converts raw (21, 3) landmark array into a 63-element translation- and
scale-invariant feature vector suitable for gesture classifiers.
"""
import numpy as np
def normalize_hand(points: np.ndarray) -> np.ndarray:
"""
Normalize 21 MediaPipe hand landmarks into a scale/translation-invariant
63-element feature vector.
Steps:
1. Translate so wrist (landmark 0) is at origin.
2. Scale so distance from wrist to middle-finger MCP (landmark 9) = 1.0.
If that distance is near-zero (degenerate hand), return zeros.
3. Flatten to shape (63,).
Args:
points: (21, 3) float32 array raw MediaPipe landmark coords.
Returns:
(63,) float32 feature vector.
"""
pts = points.astype(np.float32).copy()
pts -= pts[0] # translate: wrist → origin
scale = float(np.linalg.norm(pts[9])) # wrist-to-middle-MCP distance
if scale > 1e-6:
pts /= scale
return pts.flatten()

View file

@ -43,6 +43,7 @@ When llm.yaml is absent, the router builds a minimal config from environment
variables: ANTHROPIC_API_KEY, OPENAI_API_KEY / OPENAI_BASE_URL, OLLAMA_HOST.
Ollama on localhost:11434 is always included as the lowest-cost local fallback.
"""
import logging
import os
import yaml
@ -56,8 +57,11 @@ CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml"
class LLMRouter:
def __init__(self, config_path: Path = CONFIG_PATH):
if config_path.exists():
def __init__(self, config_path: Path | dict = CONFIG_PATH):
self._ollama_tags_cache: dict[str, set[str]] = {}
if isinstance(config_path, dict):
self.config = config_path
elif config_path.exists():
with open(config_path) as f:
self.config = yaml.safe_load(f)
else:
@ -70,7 +74,8 @@ class LLMRouter:
)
logger.info(
"[LLMRouter] No llm.yaml found — using env-var auto-config "
"(backends: %s)", ", ".join(env_config["fallback_order"])
"(backends: %s)",
", ".join(env_config["fallback_order"]),
)
self.config = env_config
@ -103,7 +108,9 @@ class LLMRouter:
backends["openai"] = {
"type": "openai_compat",
"enabled": True,
"base_url": os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1"),
"base_url": os.environ.get(
"OPENAI_BASE_URL", "https://api.openai.com/v1"
),
"model": os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
"api_key": os.environ.get("OPENAI_API_KEY"),
"supports_images": True,
@ -141,6 +148,37 @@ class LLMRouter:
except Exception:
return False
def _check_ollama_model_pulled(self, base_url: str, model: str) -> None:
"""Raise RuntimeError with actionable message if model is not pulled in Ollama.
Silently skips the check if the /api/tags endpoint is unavailable (e.g. vLLM).
Results are cached per base_url for the lifetime of this router instance.
"""
tags_url = base_url.rstrip("/").removesuffix("/v1") + "/api/tags"
if not hasattr(self, "_ollama_tags_cache"):
self._ollama_tags_cache = {}
if base_url not in self._ollama_tags_cache:
try:
resp = requests.get(tags_url, timeout=3)
if resp.status_code != 200:
return
pulled = {
m["name"].split(":")[0]
for m in resp.json().get("models", [])
}
self._ollama_tags_cache[base_url] = pulled
except Exception:
return # can't verify — let the actual embed call fail naturally
pulled_models = self._ollama_tags_cache.get(base_url)
if pulled_models is None:
return
model_base = model.split(":")[0]
if model_base not in pulled_models:
raise RuntimeError(
f'Ollama embedding model "{model}" is not pulled.\n'
f"Fix: ollama pull {model}"
)
def _resolve_model(self, client: OpenAI, model: str) -> str:
"""Resolve __auto__ to the first model served by vLLM."""
if model != "__auto__":
@ -152,10 +190,19 @@ class LLMRouter:
"""
If backend config has a cf_orch block and CF_ORCH_URL is set (env takes
precedence over yaml url), allocate via cf-orch and return (ctx, alloc).
Two allocation modes:
- task-based (preferred): cf_orch block has `product` + `task` keys.
Calls POST /api/inference/task; coordinator resolves model/node from
assignments.yaml. No hardcoded model IDs in product config.
- service-based (legacy): cf_orch block has `service` key.
Calls allocate(service=...) directly.
Returns None if not configured or allocation fails.
Caller MUST call ctx.__exit__(None, None, None) in a finally block.
"""
import os
orch_cfg = backend.get("cf_orch")
if not orch_cfg:
return None
@ -164,31 +211,46 @@ class LLMRouter:
return None
try:
from circuitforge_orch.client import CFOrchClient
client = CFOrchClient(orch_url)
service = orch_cfg.get("service", "vllm")
candidates = orch_cfg.get("model_candidates", [])
ttl_s = float(orch_cfg.get("ttl_s", 3600.0))
# CF_APP_NAME identifies the calling product (kiwi, peregrine, etc.)
# in coordinator analytics — set in each product's .env.
pipeline = os.environ.get("CF_APP_NAME") or None
# Task-based allocation: product+task → coordinator resolves model/node.
task = orch_cfg.get("task")
product = orch_cfg.get("product") or os.environ.get("CF_APP_NAME") or None
if task and product:
ctx = client.task_allocate(product, task, ttl_s=ttl_s)
alloc = ctx.__enter__()
return (ctx, alloc)
# Service-based allocation (legacy path).
cf_app = os.environ.get("CF_APP_NAME") or None
caller = f"{cf_app}.llm-router" if cf_app else "llm-router"
ctx = client.allocate(
service,
model_candidates=candidates,
orch_cfg.get("service", "vllm"),
model_candidates=orch_cfg.get("model_candidates", []),
ttl_s=ttl_s,
caller="llm-router",
pipeline=pipeline,
caller=caller,
pipeline=cf_app,
)
alloc = ctx.__enter__()
return (ctx, alloc)
except Exception as exc:
logger.warning("[LLMRouter] cf_orch allocation failed, using base_url directly: %s", exc)
logger.warning(
"[LLMRouter] cf_orch allocation failed, using base_url directly: %s",
exc,
)
return None
def complete(self, prompt: str, system: str | None = None,
def complete(
self,
prompt: str,
system: str | None = None,
model_override: str | None = None,
fallback_order: list[str] | None = None,
images: list[str] | None = None,
max_tokens: int | None = None) -> str:
max_tokens: int | None = None,
) -> str:
"""
Generate a completion. Tries each backend in fallback_order.
@ -206,7 +268,11 @@ class LLMRouter:
"AI inference is disabled in the public demo. "
"Run your own instance to use AI features."
)
order = fallback_order if fallback_order is not None else self.config["fallback_order"]
order = (
fallback_order
if fallback_order is not None
else self.config["fallback_order"]
)
for name in order:
backend = self.config["backends"][name]
@ -283,10 +349,14 @@ class LLMRouter:
if images and supports_images:
content = [{"type": "text", "text": prompt}]
for img in images:
content.append({
content.append(
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img}"},
})
"image_url": {
"url": f"data:image/png;base64,{img}"
},
}
)
messages.append({"role": "user", "content": content})
else:
messages.append({"role": "user", "content": prompt})
@ -311,18 +381,27 @@ class LLMRouter:
elif backend["type"] == "anthropic":
api_key = os.environ.get(backend["api_key_env"], "")
if not api_key:
print(f"[LLMRouter] {name}: {backend['api_key_env']} not set, skipping")
print(
f"[LLMRouter] {name}: {backend['api_key_env']} not set, skipping"
)
continue
try:
import anthropic as _anthropic
client = _anthropic.Anthropic(api_key=api_key)
if images and supports_images:
content = []
for img in images:
content.append({
content.append(
{
"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": img},
})
"source": {
"type": "base64",
"media_type": "image/png",
"data": img,
},
}
)
content.append({"type": "text", "text": prompt})
else:
content = prompt
@ -342,6 +421,84 @@ class LLMRouter:
raise RuntimeError("All LLM backends exhausted")
def embed(
self,
texts: list[str],
model_override: str | None = None,
fallback_order: list[str] | None = None,
) -> list[list[float]]:
"""
Generate embeddings for a list of texts.
Only openai_compat backends are tried Ollama and vLLM expose
/v1/embeddings; anthropic and vision_service do not.
Uses ``embedding_model`` from backend config when present;
falls back to ``model`` (the chat model) otherwise.
Args:
texts: Texts to embed (batched in a single API call).
model_override: Override the embedding model for this call.
fallback_order: Override the backend fallback order for this call.
Returns:
List of float vectors, one per input text, in input order.
Raises:
RuntimeError: If all eligible backends are exhausted.
"""
if os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes"):
raise RuntimeError(
"AI inference is disabled in the public demo. "
"Run your own instance to use AI features."
)
order = (
fallback_order
if fallback_order is not None
else self.config["fallback_order"]
)
for name in order:
backend = self.config["backends"][name]
if not backend.get("enabled", True):
continue
if backend["type"] != "openai_compat":
continue
orch_ctx = orch_alloc = None
orch_result = self._try_cf_orch_alloc(backend)
if orch_result is not None:
orch_ctx, orch_alloc = orch_result
backend = {**backend, "base_url": orch_alloc.url + "/v1"}
elif not self._is_reachable(backend["base_url"]):
print(f"[LLMRouter] {name}: unreachable, skipping")
continue
embed_model = model_override or backend.get(
"embedding_model", backend["model"]
)
self._check_ollama_model_pulled(backend["base_url"], embed_model)
try:
client = OpenAI(
base_url=backend["base_url"],
api_key=backend.get("api_key") or "any",
)
model = embed_model
resp = client.embeddings.create(model=model, input=texts)
print(f"[LLMRouter] embed: used backend {name} ({model})")
return [item.embedding for item in resp.data]
except Exception as e:
print(f"[LLMRouter] {name}: embed error — {e}, trying next")
continue
finally:
if orch_ctx is not None:
try:
orch_ctx.__exit__(None, None, None)
except Exception:
pass
raise RuntimeError("All LLM backends exhausted for embed()")
# Module-level singleton for convenience
_router: LLMRouter | None = None

View file

@ -92,7 +92,21 @@ def make_reranker(
return MockTextReranker()
_model_id = model_id or os.environ.get("CF_RERANKER_MODEL", _DEFAULT_MODEL)
_backend = backend or os.environ.get("CF_RERANKER_BACKEND", "bge")
_backend = backend or os.environ.get("CF_RERANKER_BACKEND", "")
# Auto-route to cf-orch when CF_ORCH_URL is set and no explicit backend override.
# Cloud deployments set CF_ORCH_URL; local dev leaves it unset → local inference.
if not _backend:
orch_url = os.environ.get("CF_ORCH_URL", "")
if orch_url:
from circuitforge_core.reranker.adapters.remote import RemoteTextReranker
logger.info("[reranker] CF_ORCH_URL set — using remote cf-reranker via cf-orch")
return RemoteTextReranker.from_cf_orch(
orch_url=orch_url,
service="cf-reranker",
ttl_s=float(os.environ.get("CF_RERANKER_TTL", "3600")),
)
_backend = "bge" # local default
if _backend == "mock":
return MockTextReranker()
@ -101,10 +115,25 @@ def make_reranker(
from circuitforge_core.reranker.adapters.bge import BGETextReranker
return BGETextReranker(_model_id)
if _backend == "qwen3":
from circuitforge_core.reranker.adapters.qwen3 import Qwen3TextReranker
return Qwen3TextReranker(_model_id)
if _backend == "cross-encoder":
from circuitforge_core.reranker.adapters.cross_encoder import CrossEncoderTextReranker
return CrossEncoderTextReranker(_model_id)
if _backend == "cohere":
from circuitforge_core.reranker.adapters.cohere import CohereTextReranker
return CohereTextReranker(model=_model_id)
if _backend == "remote":
from circuitforge_core.reranker.adapters.remote import RemoteTextReranker
return RemoteTextReranker(_model_id)
raise ValueError(
f"Unknown reranker backend {_backend!r}. "
"Valid options: 'bge', 'mock'. "
"(Qwen3 support is coming in Phase 2.)"
"Valid options: 'bge', 'qwen3', 'cross-encoder', 'cohere', 'remote', 'mock'."
)

View file

@ -0,0 +1,94 @@
# circuitforge_core/reranker/adapters/cohere.py — Cohere Rerank API (BYOK cloud)
#
# Requires: pip install circuitforge-core[reranker-cohere]
# API key: set COHERE_API_KEY env var, or pass api_key= explicitly.
#
# Models (as of 2026):
# rerank-english-v3.0 English-only, highest quality
# rerank-multilingual-v3.0 Multilingual
# rerank-english-v2.0 Legacy, lower cost
#
# BYOK unlock path: free-tier users who supply their own Cohere key get cloud
# reranking without needing a cf-orch node. Same pattern as the Anthropic
# backend in LLMRouter.
#
# MIT licensed.
from __future__ import annotations
import logging
import os
from typing import Sequence
from circuitforge_core.reranker.base import TextReranker
logger = logging.getLogger(__name__)
try:
import cohere as _cohere # type: ignore[import]
except ImportError:
_cohere = None # type: ignore[assignment]
_DEFAULT_MODEL = "rerank-english-v3.0"
class CohereTextReranker(TextReranker):
"""
Cloud reranker backed by the Cohere Rerank API.
BYOK (bring your own key): pass api_key= or set COHERE_API_KEY in the
environment. No model weights loaded locally.
Usage:
reranker = CohereTextReranker() # reads COHERE_API_KEY from env
results = reranker.rerank("chicken soup recipe", ["recipe 1...", "recipe 2..."])
With an explicit key and model:
reranker = CohereTextReranker(
api_key="co-...",
model="rerank-multilingual-v3.0",
)
"""
def __init__(
self,
api_key: str | None = None,
model: str = _DEFAULT_MODEL,
max_chunks_per_doc: int = 1,
) -> None:
self._api_key_arg = api_key
self._model = model
self._max_chunks_per_doc = max_chunks_per_doc
@property
def model_id(self) -> str:
return f"cohere:{self._model}"
def _get_client(self) -> object:
if _cohere is None:
raise ImportError(
"cohere is not installed. "
"Run: pip install circuitforge-core[reranker-cohere]"
)
api_key = self._api_key_arg or os.environ.get("COHERE_API_KEY", "")
if not api_key:
raise RuntimeError(
"Cohere API key is not set. "
"Pass api_key= to CohereTextReranker or set COHERE_API_KEY."
)
return _cohere.Client(api_key=api_key)
def _score_pairs(self, query: str, candidates: list[str]) -> list[float]:
client = self._get_client()
response = client.rerank( # type: ignore[union-attr]
query=query,
documents=candidates,
model=self._model,
top_n=len(candidates),
max_chunks_per_doc=self._max_chunks_per_doc,
)
# response.results is sorted by relevance_score desc; rebuild
# in original candidate order so TextReranker.rerank() re-sorts correctly.
score_map: dict[int, float] = {
r.index: r.relevance_score for r in response.results
}
return [score_map.get(i, 0.0) for i in range(len(candidates))]

View file

@ -0,0 +1,96 @@
# circuitforge_core/reranker/adapters/cross_encoder.py — sentence-transformers CrossEncoder
#
# Requires: pip install circuitforge-core[reranker-cross-encoder]
#
# Covers models not in the FlagEmbedding ecosystem:
# mixedbread-ai/mxbai-rerank-base-v1 ~570MB VRAM, strong general-purpose
# mixedbread-ai/mxbai-rerank-large-v1 ~1.3GB VRAM, higher quality
# cross-encoder/ms-marco-MiniLM-L-6-v2 ~90MB, fast, English-only
# cross-encoder/ms-marco-MiniLM-L-12-v2 ~130MB, balanced
# jinaai/jina-reranker-v2-base-multilingual ~280MB, multilingual
#
# MIT licensed.
from __future__ import annotations
import logging
import threading
from typing import Sequence
from circuitforge_core.reranker.base import TextReranker
logger = logging.getLogger(__name__)
try:
from sentence_transformers import CrossEncoder as _CrossEncoder # type: ignore[import]
except ImportError:
_CrossEncoder = None # type: ignore[assignment]
def _cuda_available() -> bool:
try:
import torch
return torch.cuda.is_available()
except ImportError:
return False
class CrossEncoderTextReranker(TextReranker):
"""
Cross-encoder reranker using the sentence-transformers CrossEncoder class.
Broader model compatibility than BGETextReranker any HuggingFace model
with a sequence-classification head works here. Particularly well-suited
for the mxbai-rerank and ms-marco families.
Usage:
reranker = CrossEncoderTextReranker("mixedbread-ai/mxbai-rerank-base-v1")
results = reranker.rerank("chicken soup recipe", ["recipe 1...", "recipe 2..."])
"""
def __init__(
self,
model_id: str = "mixedbread-ai/mxbai-rerank-base-v1",
max_length: int = 512,
) -> None:
self._model_id = model_id
self._max_length = max_length
self._model: object | None = None
self._lock = threading.Lock()
@property
def model_id(self) -> str:
return self._model_id
def load(self) -> None:
"""Explicitly load model weights. Called automatically on first rerank()."""
if _CrossEncoder is None:
raise ImportError(
"sentence-transformers is not installed. "
"Run: pip install circuitforge-core[reranker-cross-encoder]"
)
with self._lock:
if self._model is not None:
return
device = "cuda" if _cuda_available() else "cpu"
logger.info(
"Loading CrossEncoder reranker: %s (device=%s)", self._model_id, device
)
self._model = _CrossEncoder(
self._model_id,
max_length=self._max_length,
device=device,
)
def unload(self) -> None:
"""Release model weights."""
with self._lock:
self._model = None
def _score_pairs(self, query: str, candidates: list[str]) -> list[float]:
if self._model is None:
self.load()
pairs = [(query, c) for c in candidates]
with self._lock:
raw = self._model.predict(pairs) # type: ignore[union-attr]
# predict() returns a numpy array or list; normalise to plain floats.
return [float(s) for s in raw]

View file

@ -0,0 +1,239 @@
# circuitforge_core/reranker/adapters/qwen3.py — Qwen3-Reranker adapter
#
# Requires: pip install circuitforge-core[reranker-qwen3]
# Tested with: Qwen/Qwen3-Reranker-0.6B, -1.5B, -8B
#
# Scoring mechanism (generative reranker):
# Rather than generating a full response, we pre-fill the assistant turn with
# the <think>\n\n</think>\n block and read the logits at the last input token
# position. The softmax probability of "yes" vs "no" at that position is the
# relevance score — one forward pass per batch, no generation loop.
#
# Prompt format (Qwen3 chat template):
# system: "Judge whether the Document meets the requirements based on the
# Query and the Instruct. Note that the answer can only be 'yes'
# or 'no'."
# user: "<Instruct>: {task}\n<Query>: {query}\n<Document>: {doc}"
# assistant (pre-filled): "<think>\n\n</think>\n\n"
#
# MIT licensed.
from __future__ import annotations
import logging
import threading
from typing import Sequence
from circuitforge_core.reranker.base import TextReranker
logger = logging.getLogger(__name__)
# Optional heavy deps — lazy-imported at load() time.
try:
import torch as _torch # type: ignore[import]
except ImportError:
_torch = None # type: ignore[assignment]
try:
from transformers import AutoModelForCausalLM as _AutoModel # type: ignore[import]
from transformers import AutoTokenizer as _AutoTokenizer # type: ignore[import]
except ImportError:
_AutoModel = None # type: ignore[assignment]
_AutoTokenizer = None # type: ignore[assignment]
# System prompt used for all reranking tasks.
_SYSTEM_PROMPT = (
"Judge whether the Document meets the requirements based on the Query and "
'the Instruct. Note that the answer can only be "yes" or "no".'
)
# Default task instruction — products can override via make_reranker(task=...).
_DEFAULT_TASK = "Given a query, retrieve the most relevant document that answers the query."
# The pre-filled assistant turn that puts the model past its thinking block
# so the very next token position scores "yes" vs "no".
_ASSISTANT_PREFILL = "<think>\n\n</think>\n\n"
def _requires_deps() -> None:
if _torch is None:
raise ImportError(
"torch is not installed. Run: pip install circuitforge-core[reranker-qwen3]"
)
if _AutoModel is None:
raise ImportError(
"transformers is not installed. Run: pip install circuitforge-core[reranker-qwen3]"
)
class Qwen3TextReranker(TextReranker):
"""
Generative reranker using the Qwen3-Reranker model family.
Scores candidates by reading yes/no token logits at the last input position
after pre-filling the assistant thinking block. One forward pass covers an
entire batch efficient for ranking large candidate lists.
Model options (by tier):
Free: Qwen/Qwen3-Reranker-0.6B (~1.2 GB VRAM fp16)
Qwen/Qwen3-Reranker-1.5B (~3.0 GB VRAM fp16)
Paid: Qwen/Qwen3-Reranker-8B (~16 GB VRAM fp16, or ~9 GB int8)
Usage:
reranker = Qwen3TextReranker("Qwen/Qwen3-Reranker-0.6B")
results = reranker.rerank("chicken soup recipe", ["recipe 1...", "recipe 2..."])
With a custom task instruction:
reranker = Qwen3TextReranker(
"Qwen/Qwen3-Reranker-1.5B",
task="Given a job description, retrieve the most relevant resume.",
)
"""
def __init__(
self,
model_id: str = "Qwen/Qwen3-Reranker-0.6B",
task: str = _DEFAULT_TASK,
device: str | None = None,
dtype: str = "float16",
batch_size: int = 32,
) -> None:
self._model_id = model_id
self._task = task
self._device = device # None = auto-detect at load time
self._dtype_str = dtype
self._batch_size = batch_size
self._model: object | None = None
self._tokenizer: object | None = None
self._yes_id: int | None = None
self._no_id: int | None = None
self._lock = threading.Lock()
@property
def model_id(self) -> str:
return self._model_id
def load(self) -> None:
"""Explicitly load model weights. Called automatically on first rerank()."""
_requires_deps()
with self._lock:
if self._model is not None:
return
device = self._device or ("cuda" if _torch.cuda.is_available() else "cpu")
dtype_map: dict[str, object] = {
"float16": _torch.float16,
"bfloat16": _torch.bfloat16,
"float32": _torch.float32,
}
torch_dtype = dtype_map.get(self._dtype_str, _torch.float16)
logger.info(
"Loading Qwen3 reranker: %s (device=%s dtype=%s)",
self._model_id, device, self._dtype_str,
)
tokenizer = _AutoTokenizer.from_pretrained(
self._model_id, trust_remote_code=True
)
model = _AutoModel.from_pretrained(
self._model_id,
torch_dtype=torch_dtype,
device_map=device,
trust_remote_code=True,
)
model.eval()
# Resolve the token IDs for "yes" and "no" once at load time.
# Qwen tokenizers encode single-word tokens without a leading space.
yes_ids: list[int] = tokenizer.encode("yes", add_special_tokens=False)
no_ids: list[int] = tokenizer.encode("no", add_special_tokens=False)
if not yes_ids or not no_ids:
raise RuntimeError(
f"Could not resolve 'yes'/'no' token IDs from tokenizer {self._model_id!r}. "
"This model may not be a Qwen3-Reranker variant."
)
self._tokenizer = tokenizer
self._model = model
self._yes_id = yes_ids[0]
self._no_id = no_ids[0]
def unload(self) -> None:
"""Release model weights. Useful for VRAM management between tasks."""
with self._lock:
self._model = None
self._tokenizer = None
self._yes_id = None
self._no_id = None
def _build_prompt(self, query: str, document: str) -> str:
"""Format a single (query, document) pair as a chat-template prompt."""
messages = [
{"role": "system", "content": _SYSTEM_PROMPT},
{
"role": "user",
"content": (
f"<Instruct>: {self._task}\n"
f"<Query>: {query}\n"
f"<Document>: {document}"
),
},
]
# apply_chat_template without tokenization so we can append the prefill.
text: str = self._tokenizer.apply_chat_template( # type: ignore[union-attr]
messages,
tokenize=False,
add_generation_prompt=True,
)
return text + _ASSISTANT_PREFILL
def _score_pairs(self, query: str, candidates: list[str]) -> list[float]:
if self._model is None:
self.load()
return self._score_in_batches(query, candidates)
def _score_in_batches(self, query: str, candidates: list[str]) -> list[float]:
"""Score all (query, candidate) pairs, splitting into sub-batches."""
all_scores: list[float] = []
for start in range(0, len(candidates), self._batch_size):
batch = candidates[start : start + self._batch_size]
all_scores.extend(self._score_batch(query, batch))
return all_scores
def _score_batch(self, query: str, candidates: list[str]) -> list[float]:
"""Single forward pass for one sub-batch. Returns a score per candidate."""
prompts = [self._build_prompt(query, c) for c in candidates]
# Left-pad so the last token position is consistent across all sequences.
tokenizer = self._tokenizer # type: ignore[union-attr]
original_side = getattr(tokenizer, "padding_side", "right")
tokenizer.padding_side = "left"
try:
encoded = tokenizer(
prompts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=4096,
)
finally:
tokenizer.padding_side = original_side
model = self._model # type: ignore[union-attr]
device = next(model.parameters()).device # type: ignore[union-attr]
input_ids = encoded["input_ids"].to(device)
attention_mask = encoded["attention_mask"].to(device)
with self._lock:
with _torch.no_grad():
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
# logits shape: (batch, seq_len, vocab_size)
# Last position [-1] is the token the model would output next.
last_logits = outputs.logits[:, -1, :] # (batch, vocab)
yes_logits = last_logits[:, self._yes_id] # (batch,)
no_logits = last_logits[:, self._no_id] # (batch,)
# Softmax over yes/no only — score = P(yes | query, doc).
stacked = _torch.stack([yes_logits, no_logits], dim=-1) # (batch, 2)
probs = _torch.softmax(stacked, dim=-1)
scores: list[float] = probs[:, 0].float().cpu().tolist()
return scores

View file

@ -0,0 +1,131 @@
# circuitforge_core/reranker/adapters/remote.py — HTTP remote reranker adapter
#
# Calls a cf-reranker service endpoint (cf-orch allocated or static URL).
# No model weights loaded locally — all inference runs on the remote node.
#
# MIT licensed.
from __future__ import annotations
import logging
from typing import Sequence
import requests
from circuitforge_core.reranker.base import TextReranker
logger = logging.getLogger(__name__)
# Default timeout for a single /rerank call (seconds).
# Large candidate lists may take longer — callers can pass timeout= explicitly.
_DEFAULT_TIMEOUT = 30
class RemoteTextReranker(TextReranker):
"""
Reranker that delegates scoring to a remote cf-reranker HTTP service.
The remote service must implement POST /rerank with the request body::
{"query": str, "candidates": [str, ...], "top_n": int}
and return::
{"results": [{"candidate": str, "score": float, "rank": int}, ...]}
cf-orch allocation (recommended starts service on-demand):
reranker = RemoteTextReranker.from_cf_orch(
orch_url="http://10.1.10.71:7700",
service="cf-reranker",
model_candidates=["qwen3-0.6b"],
)
Static URL (e.g. dedicated node already running cf-reranker):
reranker = RemoteTextReranker("http://10.1.10.10:8011")
"""
def __init__(
self,
base_url: str,
timeout: int = _DEFAULT_TIMEOUT,
_model_id: str = "remote",
) -> None:
self._base_url = base_url.rstrip("/")
self._timeout = timeout
self._model_id_str = _model_id
@property
def model_id(self) -> str:
return self._model_id_str
@classmethod
def from_cf_orch(
cls,
orch_url: str,
service: str = "cf-reranker",
model_candidates: list[str] | None = None,
ttl_s: float = 3600.0,
timeout: int = _DEFAULT_TIMEOUT,
) -> "RemoteTextReranker":
"""
Allocate a cf-reranker service via cf-orch and return a configured adapter.
Blocks until allocation succeeds or raises on failure. The returned
adapter is valid for the duration of the TTL; create a new one if the
lease expires.
This is a one-shot allocation the caller owns the lifetime. For
long-running services, prefer the static URL constructor and let
cf-orch manage the process independently.
"""
try:
from circuitforge_orch.client import CFOrchClient # type: ignore[import]
except ImportError as exc:
raise ImportError(
"circuitforge_orch is not installed — cannot allocate via cf-orch."
) from exc
client = CFOrchClient(orch_url)
ctx = client.allocate(
service,
model_candidates=model_candidates or [],
ttl_s=ttl_s,
caller="reranker-remote",
)
alloc = ctx.__enter__()
# Note: caller is responsible for ctx.__exit__() when done.
# We stash it on the instance so callers can call release().
instance = cls(
base_url=alloc.url,
timeout=timeout,
_model_id=f"remote:{service}",
)
instance._orch_ctx = ctx # type: ignore[attr-defined]
return instance
def release(self) -> None:
"""Release the cf-orch allocation if this adapter was created via from_cf_orch()."""
ctx = getattr(self, "_orch_ctx", None)
if ctx is not None:
try:
ctx.__exit__(None, None, None)
except Exception:
pass
self._orch_ctx = None # type: ignore[attr-defined]
def _score_pairs(self, query: str, candidates: list[str]) -> list[float]:
url = f"{self._base_url}/rerank"
payload = {"query": query, "candidates": candidates, "top_n": 0}
try:
resp = requests.post(url, json=payload, timeout=self._timeout)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(
f"Remote reranker at {url!r} failed: {exc}"
) from exc
data = resp.json()
# Build a score-per-candidate list in the original order.
score_map: dict[str, float] = {
r["candidate"]: r["score"] for r in data["results"]
}
return [score_map.get(c, 0.0) for c in candidates]

View file

@ -0,0 +1,169 @@
"""
circuitforge_core.reranker.app cf-reranker FastAPI service.
Managed by cf-orch as a process-type service. cf-orch starts this via:
python -m circuitforge_core.reranker.app \
--model BAAI/bge-reranker-base \
--backend bge \
--port 8011 \
--gpu-id 0
Or with Qwen3:
python -m circuitforge_core.reranker.app \
--model Qwen/Qwen3-Reranker-0.6B \
--backend qwen3 \
--port 8011 \
--gpu-id 0 \
--dtype float16
Endpoints:
GET /health {"status": "ok", "model": "...", "backend": "...", "vram_mb": n}
POST /rerank RerankResponse
"""
from __future__ import annotations
import argparse
import logging
import os
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
logger = logging.getLogger(__name__)
# ── Request / response models ─────────────────────────────────────────────────
class RerankRequest(BaseModel):
query: str
candidates: list[str]
top_n: int = 0
class RerankResultItem(BaseModel):
candidate: str
score: float
rank: int
class RerankResponse(BaseModel):
results: list[RerankResultItem]
model: str
class HealthResponse(BaseModel):
status: str
model: str
backend: str
vram_mb: int
# ── VRAM estimates by backend/model family ────────────────────────────────────
_VRAM_TABLE: dict[str, int] = {
"bge-reranker-base": 570,
"bge-reranker-large": 1300,
"bge-reranker-v2-m3": 570,
"mxbai-rerank-base-v1": 570,
"mxbai-rerank-large-v1": 1300,
"ms-marco-MiniLM-L-6-v2": 90,
"ms-marco-MiniLM-L-12-v2": 130,
"Qwen3-Reranker-0.6B": 1200,
"Qwen3-Reranker-1.5B": 3000,
"Qwen3-Reranker-8B": 16000,
}
def _estimate_vram(model_id: str) -> int:
for key, mb in _VRAM_TABLE.items():
if key in model_id:
return mb
return 1024 # safe default
# ── App factory ───────────────────────────────────────────────────────────────
def create_app(model_id: str, backend: str, dtype: str, mock: bool) -> FastAPI:
from circuitforge_core.reranker import make_reranker
app = FastAPI(title="cf-reranker", version="0.1.0")
_reranker = make_reranker(model_id=model_id, backend=backend, mock=mock)
_vram_mb = _estimate_vram(model_id)
logger.info("cf-reranker ready: model=%r backend=%r vram=%dMB", model_id, backend, _vram_mb)
@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
return HealthResponse(
status="ok",
model=_reranker.model_id,
backend=backend,
vram_mb=_vram_mb,
)
@app.post("/rerank", response_model=RerankResponse)
async def rerank(req: RerankRequest) -> RerankResponse:
if not req.candidates:
raise HTTPException(status_code=400, detail="candidates must not be empty")
try:
results = _reranker.rerank(req.query, req.candidates, top_n=req.top_n)
except Exception as exc:
logger.exception("rerank failed")
raise HTTPException(status_code=500, detail=str(exc)) from exc
return RerankResponse(
results=[
RerankResultItem(candidate=r.candidate, score=r.score, rank=r.rank)
for r in results
],
model=_reranker.model_id,
)
return app
# ── CLI entry point ───────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="cf-reranker — CircuitForge reranker service")
parser.add_argument(
"--model", default="BAAI/bge-reranker-base",
help="HuggingFace model ID or local path",
)
parser.add_argument(
"--backend", default="bge",
choices=["bge", "qwen3", "cross-encoder", "mock"],
help="Reranker backend",
)
parser.add_argument("--port", type=int, default=8011)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--gpu-id", type=int, default=0)
parser.add_argument(
"--dtype", default="float16",
choices=["float16", "bfloat16", "float32"],
)
parser.add_argument("--mock", action="store_true",
help="Run with mock backend (no GPU, for testing)")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
if args.backend != "mock" and not args.mock:
os.environ.setdefault("CUDA_VISIBLE_DEVICES", str(args.gpu_id))
mock = args.mock or os.environ.get("CF_RERANKER_MOCK", "") == "1"
app = create_app(
model_id=args.model,
backend=args.backend,
dtype=args.dtype,
mock=mock,
)
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,4 @@
from .base import VectorMatch, VectorStore
from .sqlite_vec import LocalSQLiteVecStore
__all__ = ["VectorMatch", "VectorStore", "LocalSQLiteVecStore"]

View file

@ -0,0 +1,50 @@
"""
circuitforge_core.vector.base VectorStore ABC and shared types.
Concrete implementations: LocalSQLiteVecStore (local), QdrantStore (cloud Paid tier).
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class VectorMatch:
"""A single result from a vector similarity search."""
entry_id: str
score: float # lower is better (L2 / cosine distance)
metadata: dict[str, Any] = field(default_factory=dict)
class VectorStore(ABC):
"""Abstract interface for vector storage backends."""
@abstractmethod
def upsert(
self, entry_id: str, vector: list[float], metadata: dict[str, Any]
) -> None:
"""Insert or replace a vector and its metadata."""
@abstractmethod
def query(
self,
vector: list[float],
top_k: int = 10,
filter_metadata: dict[str, Any] | None = None,
) -> list[VectorMatch]:
"""Return the top_k nearest vectors. Optional metadata filter applied post-search."""
@abstractmethod
def delete(self, entry_id: str) -> None:
"""Remove a single vector by string ID. No-op if not found."""
@abstractmethod
def delete_where(self, filter_metadata: dict[str, Any]) -> int:
"""Remove all vectors whose metadata matches all key-value pairs. Returns count removed.
Raises ValueError if filter_metadata is empty (would delete entire store).
"""

View file

@ -0,0 +1,185 @@
# circuitforge_core/vector/sqlite_vec.py
"""
circuitforge_core.vector.sqlite_vec -- sqlite-vec backed VectorStore.
Suitable for single-user local deployments. Cloud Paid tier replaces
this with QdrantStore via the same VectorStore ABC.
"""
from __future__ import annotations
import json
import logging
import re
import sqlite3
import struct
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Generator
import sqlite_vec
from .base import VectorMatch, VectorStore
logger = logging.getLogger(__name__)
_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
def _serialize(vector: list[float]) -> bytes:
return struct.pack(f"<{len(vector)}f", *vector)
class LocalSQLiteVecStore(VectorStore):
"""
VectorStore backed by sqlite-vec virtual tables.
Uses two tables per logical store:
- ``<table>_vecs``: vec0 virtual table (rowid-indexed float vectors)
- ``<table>_meta``: companion table mapping rowid to string ID + JSON metadata
Args:
db_path: Path to SQLite database file.
table: Logical name prefix (default ``"vecs"``).
dimensions: Vector length; must match the embedding model (default 768).
"""
def __init__(
self,
db_path: str | Path,
table: str = "vecs",
dimensions: int = 768,
) -> None:
if not _SAFE_IDENTIFIER.match(table):
raise ValueError(
f"table must be a valid SQL identifier (letters, digits, underscores): {table!r}"
)
self.db_path = str(db_path)
self.table = table
self.dimensions = dimensions
self._init_tables()
@contextmanager
def _conn(self) -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(self.db_path)
conn.enable_load_extension(True)
sqlite_vec.load(conn)
conn.enable_load_extension(False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _init_tables(self) -> None:
with self._conn() as conn:
conn.execute(f"""
CREATE VIRTUAL TABLE IF NOT EXISTS {self.table}_vecs
USING vec0(embedding float[{self.dimensions}])
""")
conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table}_meta (
rowid INTEGER PRIMARY KEY,
entry_id TEXT NOT NULL UNIQUE,
metadata TEXT NOT NULL DEFAULT '{{}}'
)
""")
def upsert(
self, entry_id: str, vector: list[float], metadata: dict[str, Any]
) -> None:
with self._conn() as conn:
row = conn.execute(
f"SELECT rowid FROM {self.table}_meta WHERE entry_id = ?", [entry_id]
).fetchone()
if row:
rowid = row["rowid"]
conn.execute(
f"UPDATE {self.table}_vecs SET embedding = ? WHERE rowid = ?",
[_serialize(vector), rowid],
)
conn.execute(
f"UPDATE {self.table}_meta SET metadata = ? WHERE rowid = ?",
[json.dumps(metadata), rowid],
)
else:
cursor = conn.execute(
f"INSERT INTO {self.table}_meta(entry_id, metadata) VALUES (?, ?)",
[entry_id, json.dumps(metadata)],
)
rowid = cursor.lastrowid
conn.execute(
f"INSERT INTO {self.table}_vecs(rowid, embedding) VALUES (?, ?)",
[rowid, _serialize(vector)],
)
def query(
self,
vector: list[float],
top_k: int = 10,
filter_metadata: dict[str, Any] | None = None,
) -> list[VectorMatch]:
with self._conn() as conn:
rows = conn.execute(
f"""
SELECT m.entry_id, v.distance, m.metadata
FROM {self.table}_vecs v
JOIN {self.table}_meta m ON m.rowid = v.rowid
WHERE v.embedding MATCH ? AND k = ?
ORDER BY v.distance
""",
[_serialize(vector), top_k],
).fetchall()
results = [
VectorMatch(
entry_id=r["entry_id"],
score=r["distance"],
metadata=json.loads(r["metadata"]),
)
for r in rows
]
if filter_metadata:
results = [
r
for r in results
if all(r.metadata.get(k) == v for k, v in filter_metadata.items())
]
return results
def delete(self, entry_id: str) -> None:
with self._conn() as conn:
row = conn.execute(
f"SELECT rowid FROM {self.table}_meta WHERE entry_id = ?", [entry_id]
).fetchone()
if row:
rowid = row["rowid"]
conn.execute(f"DELETE FROM {self.table}_vecs WHERE rowid = ?", [rowid])
conn.execute(f"DELETE FROM {self.table}_meta WHERE rowid = ?", [rowid])
def delete_where(self, filter_metadata: dict[str, Any]) -> int:
if not filter_metadata:
raise ValueError(
"delete_where requires a non-empty filter; refusing to delete entire store"
)
with self._conn() as conn:
rows = conn.execute(
f"SELECT rowid, metadata FROM {self.table}_meta"
).fetchall()
to_delete = [
r["rowid"]
for r in rows
if all(
json.loads(r["metadata"]).get(k) == v
for k, v in filter_metadata.items()
)
]
for rowid in to_delete:
conn.execute(f"DELETE FROM {self.table}_vecs WHERE rowid = ?", [rowid])
conn.execute(f"DELETE FROM {self.table}_meta WHERE rowid = ?", [rowid])
return len(to_delete)

View file

@ -0,0 +1,11 @@
"""
circuitforge_core.video cf-video service: video VLM inference via Marlin-2B.
Exposes a FastAPI process (managed by cf-orch) with endpoints:
GET /health {"status": "ok", "model": str, "vram_mb": int}
POST /caption CaptionResult (scene description + timestamped events)
POST /find FindResult (temporal grounding span for a natural-language event)
Run as:
python -m circuitforge_core.video.app --model /path/to/NemoStation--Marlin-2B --port 8016 --gpu-id 0
"""

View file

@ -0,0 +1,191 @@
"""
cf-video FastAPI service managed by cf-orch.
Endpoints:
GET /health {"status": "ok", "model": str, "vram_mb": int}
POST /caption CaptionResponse (scene + timestamped events)
POST /find FindResponse (temporal grounding span)
Usage:
python -m circuitforge_core.video.app \
--model /Library/Assets/LLM/cf-video/models/NemoStation--Marlin-2B \
--port 8016 \
--gpu-id 0
The service loads the model once at startup and blocks until it is ready.
cf-orch health-polls /health before routing any inference requests.
Model requirements:
transformers >= 5.7.0
torch >= 2.11.0
torchcodec (installed)
qwen-vl-utils >= 0.0.14 (installed)
Security:
Marlin requires trust_remote_code=True. Review the model's
modeling_marlin.py before deploying on a production node.
"""
from __future__ import annotations
import argparse
import logging
import os
from typing import Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from circuitforge_core.video.backends.base import VideoBackend, make_video_backend
app = FastAPI(title="cf-video", version="0.1.0")
_backend: VideoBackend | None = None
# ── Request / response models ─────────────────────────────────────────────────
class CaptionRequest(BaseModel):
video_path: str = Field(..., description="Absolute path to the video file on this node")
max_new_tokens: int = Field(2048, ge=64, le=8192)
class VideoEventOut(BaseModel):
start: float
end: float
description: str
class CaptionResponse(BaseModel):
scene: str
events: list[VideoEventOut]
caption: str
model: str
class FindRequest(BaseModel):
video_path: str = Field(..., description="Absolute path to the video file on this node")
event: str = Field(..., min_length=1, description="Natural-language event description to locate")
max_new_tokens: int = Field(256, ge=32, le=2048)
class FindResponse(BaseModel):
span: list[float] | None = Field(
None,
description="[start_sec, end_sec] or null when the model could not ground the event",
)
format_ok: bool
raw: str
model: str
# ── Endpoints ─────────────────────────────────────────────────────────────────
@app.get("/health")
def health() -> dict[str, Any]:
if _backend is None:
raise HTTPException(503, detail="backend not initialised")
return {
"status": "ok",
"model": _backend.model_name,
"vram_mb": _backend.vram_mb,
}
@app.post("/caption", response_model=CaptionResponse)
def caption(req: CaptionRequest) -> CaptionResponse:
if _backend is None:
raise HTTPException(503, detail="backend not initialised")
try:
result = _backend.caption(req.video_path, max_new_tokens=req.max_new_tokens)
except FileNotFoundError as exc:
raise HTTPException(404, detail=str(exc)) from exc
except Exception as exc:
logging.exception("caption failed for %r", req.video_path)
raise HTTPException(500, detail=str(exc)) from exc
return CaptionResponse(
scene=result.scene,
events=[
VideoEventOut(start=ev.start, end=ev.end, description=ev.description)
for ev in result.events
],
caption=result.caption,
model=result.model,
)
@app.post("/find", response_model=FindResponse)
def find(req: FindRequest) -> FindResponse:
if _backend is None:
raise HTTPException(503, detail="backend not initialised")
try:
result = _backend.find(
req.video_path,
req.event,
max_new_tokens=req.max_new_tokens,
)
except FileNotFoundError as exc:
raise HTTPException(404, detail=str(exc)) from exc
except ValueError as exc:
raise HTTPException(422, detail=str(exc)) from exc
except Exception as exc:
logging.exception("find failed for %r event=%r", req.video_path, req.event)
raise HTTPException(500, detail=str(exc)) from exc
return FindResponse(
span=list(result.span) if result.span is not None else None,
format_ok=result.format_ok,
raw=result.raw,
model=result.model,
)
# ── CLI entry point ───────────────────────────────────────────────────────────
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="cf-video service (Marlin-2B)")
p.add_argument(
"--model",
required=True,
help="Local filesystem path to the Marlin model directory (safetensors)",
)
p.add_argument("--port", type=int, default=8016)
p.add_argument("--host", default="0.0.0.0")
p.add_argument(
"--gpu-id", type=int, default=0,
help="CUDA device index; overridden by CUDA_VISIBLE_DEVICES when set by cf-orch",
)
p.add_argument("--device", default="cuda", choices=["cuda", "cpu"])
p.add_argument(
"--mock", action="store_true",
help="Run with MockVideoBackend (no GPU, for testing)",
)
return p.parse_args()
if __name__ == "__main__":
import uvicorn
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s%(message)s",
)
args = _parse_args()
# Pin GPU selection unconditionally — --gpu-id is authoritative.
# Force PCI_BUS_ID ordering so --gpu-id matches nvidia-smi (not CUDA's
# default FASTEST_FIRST, which can swap indices on multi-GPU nodes).
if args.device == "cuda" and not args.mock:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu_id)
mock = args.mock or args.model == "mock"
device = "cpu" if mock else args.device
_backend = make_video_backend(
model_path=args.model,
mock=mock,
device=device,
gpu_id=args.gpu_id,
)
uvicorn.run(app, host=args.host, port=args.port, log_level="info")

View file

@ -0,0 +1 @@
"""Video backend registry."""

View file

@ -0,0 +1,96 @@
"""
VideoBackend Protocol backend-agnostic interface for video VLM inference.
Implementations:
MarlinBackend NemoStation/Marlin-2B (dense captioning + temporal grounding)
MockVideoBackend deterministic stub for unit tests
Both endpoints accept a video_path (local filesystem path) so the service
receives pre-staged video files rather than raw byte streams. Large uploads
should be staged by the caller before hitting /caption or /find.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol, runtime_checkable
# ── Result types ─────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class VideoEvent:
"""A single timestamped event from a caption pass."""
start: float # seconds from video start
end: float # seconds from video start
description: str
@dataclass(frozen=True)
class CaptionResult:
"""Result from a /caption call."""
scene: str # scene-level description paragraph
events: list[VideoEvent] # timestamped event list (may be empty)
caption: str # full raw caption string from the model
model: str # model name / path
@dataclass(frozen=True)
class FindResult:
"""Result from a /find call."""
span: tuple[float, float] | None # (start_sec, end_sec) or None on parse failure
format_ok: bool # True when model output matched expected format
raw: str # raw model output for debugging
model: str
# ── Backend Protocol ─────────────────────────────────────────────────────────
@runtime_checkable
class VideoBackend(Protocol):
"""Minimal interface all video backends must satisfy."""
def caption(
self,
video_path: str,
*,
max_new_tokens: int = 2048,
) -> CaptionResult: ...
def find(
self,
video_path: str,
event: str,
*,
max_new_tokens: int = 256,
) -> FindResult: ...
@property
def model_name(self) -> str: ...
@property
def vram_mb(self) -> int: ...
# ── Factory ──────────────────────────────────────────────────────────────────
def make_video_backend(
model_path: str,
*,
mock: bool = False,
device: str = "cuda",
gpu_id: int = 0,
) -> VideoBackend:
"""Instantiate the appropriate VideoBackend.
Args:
model_path: Local filesystem path to the model directory (safetensors).
mock: When True, return MockVideoBackend (no GPU required).
device: Torch device string ("cuda" or "cpu").
gpu_id: CUDA device index used only when CUDA_VISIBLE_DEVICES is
not already set externally (cf-orch sets it before spawning).
"""
if mock:
from circuitforge_core.video.backends.mock import MockVideoBackend
return MockVideoBackend(model_path)
from circuitforge_core.video.backends.marlin import MarlinBackend
return MarlinBackend(model_path=model_path, device=device)

View file

@ -0,0 +1,184 @@
"""
MarlinBackend NemoStation/Marlin-2B video VLM via HuggingFace Transformers.
Marlin-2B is a decoder-only video understanding model that produces:
- Dense scene captions with second-precise event timestamps (/caption)
- Temporal grounding of natural-language events (/find)
Requirements (install separately):
pip install "transformers>=5.7.0" "torch>=2.11.0" torchcodec "qwen-vl-utils>=0.0.14" av pillow
Security note:
trust_remote_code=True is required. The model ships a custom
AutoModelForCausalLM subclass (modeling_marlin.py). Review that file
before enabling on any node. The modeling code runs in-process with
full filesystem access.
Environment variables forwarded to the model's preprocessing layer:
FORCE_QWENVL_VIDEO_READER default: torchcodec (video decode backend)
VIDEO_MAX_PIXELS default: 200704 (max pixels per frame)
FPS default: 2.0 (frame sample rate)
FPS_MAX_FRAMES default: 240 (frame cap ~2 min video)
FPS_MIN_FRAMES default: 4 (minimum frames)
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from circuitforge_core.video.backends.base import CaptionResult, FindResult, VideoEvent
logger = logging.getLogger(__name__)
# Default env overrides so torchcodec is preferred over the slower av/ffmpeg path.
_DEFAULT_ENV: dict[str, str] = {
"FORCE_QWENVL_VIDEO_READER": "torchcodec",
}
class MarlinBackend:
"""
Load Marlin-2B once, expose caption() and find() as synchronous calls.
The model is loaded eagerly in __init__ if loading fails (OOM, missing
weights, transformers version mismatch) the error propagates immediately
rather than on first inference, so cf-orch's 2-second liveness check can
catch it.
"""
def __init__(self, model_path: str, device: str = "cuda") -> None:
self._model_path = model_path
self._device = device
# Apply env defaults before importing transformers — the model's
# custom __init__.py reads these at import time.
for key, val in _DEFAULT_ENV.items():
os.environ.setdefault(key, val)
self._model = self._load_model(model_path, device)
self._vram_mb = self._estimate_vram_mb()
logger.info(
"MarlinBackend: loaded %r on %s (~%d MB VRAM)",
model_path, device, self._vram_mb,
)
# ── Loading ──────────────────────────────────────────────────────────────
def _load_model(self, model_path: str, device: str):
import torch
from transformers import AutoModelForCausalLM
# Verify weights exist before handing to transformers — gives a clear
# error instead of a cryptic trust_remote_code failure.
path = Path(model_path)
if not path.exists():
raise FileNotFoundError(
f"Marlin model directory not found: {model_path!r}. "
"Download via Avocet or: "
f"huggingface-cli download NemoStation/Marlin-2B --local-dir {model_path}"
)
logger.info("MarlinBackend: loading model from %r ...", model_path)
model = AutoModelForCausalLM.from_pretrained(
model_path,
trust_remote_code=True, # Required — custom modeling code in repo
torch_dtype=torch.bfloat16,
device_map={"": device},
)
model.eval()
logger.info("MarlinBackend: model loaded")
return model
def _estimate_vram_mb(self) -> int:
"""Read allocated VRAM from torch after load; fall back to catalog estimate."""
try:
import torch
if torch.cuda.is_available():
return int(torch.cuda.memory_allocated() / 1024 / 1024)
except Exception:
pass
return 4500 # Catalog estimate for Marlin-2B BF16
# ── Inference ────────────────────────────────────────────────────────────
def caption(
self,
video_path: str,
*,
max_new_tokens: int = 2048,
) -> CaptionResult:
"""Produce a dense caption with scene description and timestamped events."""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path!r}")
raw_result: dict = self._model.caption(
video_path,
max_new_tokens=max_new_tokens,
do_sample=False,
)
events = [
VideoEvent(
start=float(ev["start"]),
end=float(ev["end"]),
description=str(ev["description"]),
)
for ev in raw_result.get("events", [])
]
return CaptionResult(
scene=str(raw_result.get("scene", "")),
events=events,
caption=str(raw_result.get("caption", "")),
model=self.model_name,
)
def find(
self,
video_path: str,
event: str,
*,
max_new_tokens: int = 256,
) -> FindResult:
"""Ground a natural-language event query to a video time span."""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path!r}")
if not event.strip():
raise ValueError("event query must not be empty")
raw_result: dict = self._model.find(
video_path,
event=event,
max_new_tokens=max_new_tokens,
do_sample=False,
)
# Marlin returns span as a (start, end) tuple or None.
raw_span = raw_result.get("span")
span: tuple[float, float] | None = None
if raw_span is not None:
try:
span = (float(raw_span[0]), float(raw_span[1]))
except (TypeError, IndexError, ValueError):
logger.warning(
"MarlinBackend.find: could not parse span %r for event %r",
raw_span, event,
)
return FindResult(
span=span,
format_ok=bool(raw_result.get("format_ok", False)),
raw=str(raw_result.get("raw", "")),
model=self.model_name,
)
# ── Properties ───────────────────────────────────────────────────────────
@property
def model_name(self) -> str:
return self._model_path
@property
def vram_mb(self) -> int:
return self._vram_mb

View file

@ -0,0 +1,68 @@
"""
MockVideoBackend deterministic stub for unit tests and CI.
Returns fixed CaptionResult / FindResult without any model or video I/O.
"""
from __future__ import annotations
import os
from circuitforge_core.video.backends.base import (
CaptionResult,
FindResult,
VideoEvent,
)
_MOCK_SCENE = "A mock scene with placeholder content."
_MOCK_EVENTS = [
VideoEvent(start=0.0, end=3.0, description="Mock event one"),
VideoEvent(start=3.5, end=7.2, description="Mock event two"),
]
_MOCK_CAPTION = "Scene: A mock scene with placeholder content. Events: [0.0-3.0] Mock event one. [3.5-7.2] Mock event two."
_MOCK_FIND_SPAN = (3.5, 7.2)
class MockVideoBackend:
"""No-GPU stub. Safe for import on any machine."""
def __init__(self, model_path: str = "mock") -> None:
self._model_path = model_path
def caption(
self,
video_path: str,
*,
max_new_tokens: int = 2048,
) -> CaptionResult:
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video not found: {video_path!r}")
return CaptionResult(
scene=_MOCK_SCENE,
events=list(_MOCK_EVENTS),
caption=_MOCK_CAPTION,
model=self.model_name,
)
def find(
self,
video_path: str,
event: str,
*,
max_new_tokens: int = 256,
) -> FindResult:
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video not found: {video_path!r}")
return FindResult(
span=_MOCK_FIND_SPAN,
format_ok=True,
raw="From 3.5 to 7.2.",
model=self.model_name,
)
@property
def model_name(self) -> str:
return self._model_path
@property
def vram_mb(self) -> int:
return 0

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "circuitforge-core"
version = "0.15.0"
version = "0.21.0"
description = "Shared scaffold for CircuitForge products (MIT)"
requires-python = ">=3.11"
dependencies = [
@ -66,6 +66,21 @@ musicgen-service = [
"uvicorn[standard]>=0.29",
"python-multipart>=0.0.9",
]
video-marlin = [
"torch>=2.11",
"transformers>=5.7.0",
"torchvision", # On CUDA 13 nodes install from PyTorch nightly: pip install --index-url https://download.pytorch.org/whl/nightly/cu130 torch torchvision
"torchcodec",
"qwen-vl-utils>=0.0.14",
"av",
"Pillow>=10.0",
"accelerate>=0.27",
]
video-service = [
"circuitforge-core[video-marlin]",
"fastapi>=0.110",
"uvicorn[standard]>=0.29",
]
vision-siglip = [
"torch>=2.0",
"transformers>=4.40",
@ -91,6 +106,42 @@ reranker-qwen3 = [
"transformers>=4.40",
"accelerate>=0.27",
]
reranker-cross-encoder = [
"sentence-transformers>=3.0",
]
reranker-cohere = [
"cohere>=5.0",
]
reranker-service = [
"circuitforge-core[reranker-qwen3]",
"fastapi>=0.110",
"uvicorn[standard]>=0.29",
]
gestures-mediapipe = [
"mediapipe>=0.10",
"opencv-python>=4.8",
"numpy>=1.24",
]
pdf = [
"pdfplumber>=0.11",
"pytesseract>=0.3",
"Pillow>=10.0",
]
vector = [
"sqlite-vec>=0.1",
]
mqtt = [
"aiomqtt>=2.0",
]
meshtastic-serial = [
"meshtastic>=2.5",
"pypubsub>=4.0",
]
meshtastic-service = [
"circuitforge-core[mqtt,meshtastic-serial]",
"fastapi>=0.110",
"uvicorn[standard]>=0.29",
]
dev = [
"circuitforge-core[manage]",
"pytest>=8.0",

View file

@ -0,0 +1,107 @@
# tests/test_documents/test_pdf.py
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.documents.pdf import PDFExtractor, PageChunk
def _mock_page(text: str) -> MagicMock:
page = MagicMock()
page.extract_text.return_value = text
return page
def _mock_pdf(pages: list[MagicMock]) -> MagicMock:
pdf = MagicMock()
pdf.__enter__ = MagicMock(return_value=pdf)
pdf.__exit__ = MagicMock(return_value=False)
pdf.pages = pages
return pdf
def test_chunk_pages_single_text_layer_page():
page = _mock_page(
"Fireball deals 8d6 fire damage on a failed Dexterity saving throw."
)
with patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl:
mock_pl.open.return_value = _mock_pdf([page])
chunks = PDFExtractor().chunk_pages("/fake/book.pdf")
assert len(chunks) == 1
assert chunks[0].page_number == 1
assert chunks[0].source == "text_layer"
assert "Fireball" in chunks[0].text
assert chunks[0].word_count >= 10
def test_chunk_pages_numbers_from_one():
pages = [_mock_page(f"Rule text for page {i} " * 10) for i in range(1, 4)]
with patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl:
mock_pl.open.return_value = _mock_pdf(pages)
chunks = PDFExtractor().chunk_pages("/fake/book.pdf")
assert [c.page_number for c in chunks] == [1, 2, 3]
def test_page_chunk_is_frozen():
chunk = PageChunk(page_number=1, text="hello", source="text_layer", word_count=1)
with pytest.raises(Exception):
chunk.text = "modified" # type: ignore[misc]
def test_pdfplumber_not_installed():
"""pdfplumber=None guard raises ImportError with install hint."""
import circuitforge_core.documents.pdf as pdf_mod
with patch.object(pdf_mod, "pdfplumber", None):
with pytest.raises(ImportError, match="pdfplumber"):
PDFExtractor().chunk_pages("/fake/book.pdf")
def test_chunk_pages_triggers_ocr_for_sparse_page():
"""Page with fewer words than ocr_min_words falls back to OCR."""
sparse_page = _mock_page("few words only") # 3 words < default 10
mock_image = MagicMock()
rendered = MagicMock()
rendered.original = mock_image
sparse_page.to_image.return_value = rendered
with (
patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl,
patch("circuitforge_core.documents.pdf.pytesseract") as mock_tess,
patch("circuitforge_core.documents.pdf.Image") as mock_pil,
):
mock_pl.open.return_value = _mock_pdf([sparse_page])
mock_pil.open.return_value = mock_image
mock_tess.image_to_string.return_value = (
"Full OCR extracted rulebook text about saving throws."
)
chunks = PDFExtractor(ocr_min_words=10).chunk_pages("/fake/scan.pdf")
assert chunks[0].source == "ocr"
assert "OCR extracted" in chunks[0].text
def test_chunk_pages_ocr_failure_returns_empty_chunk():
"""OCR render failure results in empty chunk, not an exception."""
sparse_page = _mock_page("")
sparse_page.to_image.side_effect = RuntimeError("render failed")
with patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl:
mock_pl.open.return_value = _mock_pdf([sparse_page])
chunks = PDFExtractor().chunk_pages("/fake/broken.pdf")
assert len(chunks) == 1
assert chunks[0].text == ""
assert chunks[0].source == "ocr"
assert chunks[0].word_count == 0
def test_chunk_pages_empty_pdf_returns_empty_list():
with patch("circuitforge_core.documents.pdf.pdfplumber") as mock_pl:
mock_pl.open.return_value = _mock_pdf([])
chunks = PDFExtractor().chunk_pages("/fake/empty.pdf")
assert chunks == []

View file

View file

@ -0,0 +1,48 @@
import numpy as np
import pytest
from unittest.mock import MagicMock, patch
@patch("circuitforge_core.input.gestures.camera.cv2")
def test_is_open_reflects_videocapture_state(mock_cv2):
from circuitforge_core.input.gestures.camera import CameraCapture
mock_cv2.VideoCapture.return_value.isOpened.return_value = True
cam = CameraCapture()
assert cam.is_open is True
mock_cv2.VideoCapture.return_value.isOpened.return_value = False
cam2 = CameraCapture()
assert cam2.is_open is False
@patch("circuitforge_core.input.gestures.camera.cv2")
def test_frames_yields_until_read_fails(mock_cv2):
from circuitforge_core.input.gestures.camera import CameraCapture
frame = np.zeros((480, 640, 3), dtype=np.uint8)
mock_cap = MagicMock()
mock_cap.isOpened.return_value = True
mock_cap.read.side_effect = [
(True, frame),
(True, frame),
(False, None), # triggers break
]
mock_cv2.VideoCapture.return_value = mock_cap
cam = CameraCapture()
collected = list(cam.frames())
assert len(collected) == 2
@patch("circuitforge_core.input.gestures.camera.cv2")
def test_context_manager_calls_release(mock_cv2):
from circuitforge_core.input.gestures.camera import CameraCapture
mock_cap = MagicMock()
mock_cv2.VideoCapture.return_value = mock_cap
with CameraCapture() as cam:
pass
mock_cap.release.assert_called_once()

View file

@ -0,0 +1,106 @@
import numpy as np
import pytest
from unittest.mock import MagicMock, patch
from circuitforge_core.input.gestures.hands import HandsDetector, HandLandmarks
def _make_mock_results(n_hands: int = 1):
"""Build a fake mediapipe result object with n_hands detected."""
mock_results = MagicMock()
if n_hands == 0:
mock_results.multi_hand_landmarks = None
mock_results.multi_handedness = None
return mock_results
hand_landmarks = []
handedness_list = []
for i in range(n_hands):
lm = MagicMock()
lm.landmark = [
MagicMock(x=float(j) / 100, y=float(j) / 200, z=0.0) for j in range(21)
]
hand_landmarks.append(lm)
hand = MagicMock()
hand.classification = [
MagicMock(label="Right" if i == 0 else "Left", score=0.95)
]
handedness_list.append(hand)
mock_results.multi_hand_landmarks = hand_landmarks
mock_results.multi_handedness = handedness_list
return mock_results
@patch("circuitforge_core.input.gestures.hands.mp")
def test_detect_returns_empty_when_no_hands(mock_mp):
mock_mp.solutions.hands.Hands.return_value.process.return_value = (
_make_mock_results(0)
)
detector = HandsDetector()
frame = np.zeros((480, 640, 3), dtype=np.uint8)
results = detector.detect(frame)
assert results == []
@patch("circuitforge_core.input.gestures.hands.mp")
def test_detect_returns_one_hand(mock_mp):
mock_mp.solutions.hands.Hands.return_value.process.return_value = (
_make_mock_results(1)
)
detector = HandsDetector()
frame = np.zeros((480, 640, 3), dtype=np.uint8)
results = detector.detect(frame)
assert len(results) == 1
h = results[0]
assert isinstance(h, HandLandmarks)
assert h.points.shape == (21, 3)
assert h.handedness == "Right"
assert 0.0 <= h.confidence <= 1.0
@patch("circuitforge_core.input.gestures.hands.mp")
def test_detect_returns_two_hands(mock_mp):
mock_mp.solutions.hands.Hands.return_value.process.return_value = (
_make_mock_results(2)
)
detector = HandsDetector()
frame = np.zeros((480, 640, 3), dtype=np.uint8)
results = detector.detect(frame)
assert len(results) == 2
@patch("circuitforge_core.input.gestures.hands.mp")
def test_handlandmarks_is_immutable(mock_mp):
mock_mp.solutions.hands.Hands.return_value.process.return_value = (
_make_mock_results(1)
)
detector = HandsDetector()
frame = np.zeros((480, 640, 3), dtype=np.uint8)
result = detector.detect(frame)[0]
with pytest.raises((AttributeError, TypeError)):
result.handedness = (
"Left" # frozen dataclass must reject attribute reassignment
)
with pytest.raises(ValueError):
result.points[0] = np.array(
[1.0, 2.0, 3.0]
) # writeable=False must reject in-place mutation
@patch("circuitforge_core.input.gestures.hands.mp")
def test_full_pipeline_hands_to_normalized_vector(mock_mp):
"""Detect hand → normalize landmarks → get 63-element vector."""
from circuitforge_core.input.gestures.normalizer import normalize_hand
mock_mp.solutions.hands.Hands.return_value.process.return_value = (
_make_mock_results(1)
)
detector = HandsDetector()
frame = np.zeros((480, 640, 3), dtype=np.uint8)
hands = detector.detect(frame)
assert len(hands) == 1
vec = normalize_hand(hands[0].points)
assert vec.shape == (63,)
assert vec.dtype == np.float32
assert not np.any(np.isnan(vec))

View file

@ -0,0 +1,51 @@
import numpy as np
import pytest
from circuitforge_core.input.gestures.normalizer import normalize_hand
def _synthetic_hand(scale: float = 1.0, offset: float = 0.0) -> np.ndarray:
"""21 landmarks, wrist at offset, middle MCP at offset + (scale, 0, 0)."""
pts = np.zeros((21, 3), dtype=np.float32)
# All landmarks start at the offset (roughly at the wrist)
for i in range(21):
pts[i] = [offset, 0.0, 0.0]
# Then define a few key landmarks relative to wrist
pts[0] = [offset, 0.0, 0.0] # wrist
pts[9] = [offset + scale, 0.0, 0.0] # middle MCP at distance scale from wrist
pts[1] = [offset + 0.1 * scale, 0.05 * scale, 0.0] # thumb
pts[5] = [offset + 0.4 * scale, 0.2 * scale, 0.0] # index
return pts
def test_output_shape():
pts = _synthetic_hand()
result = normalize_hand(pts)
assert result.shape == (63,)
def test_translation_invariance():
pts_a = _synthetic_hand(offset=0.0)
pts_b = _synthetic_hand(offset=5.0)
np.testing.assert_allclose(normalize_hand(pts_a), normalize_hand(pts_b), atol=1e-5)
def test_scale_invariance():
pts_small = _synthetic_hand(scale=0.5)
pts_large = _synthetic_hand(scale=2.0)
np.testing.assert_allclose(
normalize_hand(pts_small), normalize_hand(pts_large), atol=1e-5
)
def test_zero_scale_does_not_crash():
"""All landmarks at same point — degenerate hand. Should return zeros, not raise."""
pts = np.zeros((21, 3), dtype=np.float32)
result = normalize_hand(pts)
assert result.shape == (63,)
assert not np.any(np.isnan(result))
def test_dtype_is_float32():
pts = _synthetic_hand()
result = normalize_hand(pts)
assert result.dtype == np.float32

View file

@ -11,7 +11,8 @@ def _make_router(config: dict) -> LLMRouter:
def test_complete_uses_first_reachable_backend():
router = _make_router({
router = _make_router(
{
"fallback_order": ["local"],
"backends": {
"local": {
@ -20,20 +21,24 @@ def test_complete_uses_first_reachable_backend():
"model": "llama3",
"supports_images": False,
}
},
}
})
)
mock_client = MagicMock()
mock_client.chat.completions.create.return_value = MagicMock(
choices=[MagicMock(message=MagicMock(content="hello"))]
)
with patch.object(router, "_is_reachable", return_value=True), \
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client):
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
result = router.complete("say hello")
assert result == "hello"
def test_complete_falls_back_on_unreachable_backend():
router = _make_router({
router = _make_router(
{
"fallback_order": ["unreachable", "working"],
"backends": {
"unreachable": {
@ -47,23 +52,29 @@ def test_complete_falls_back_on_unreachable_backend():
"base_url": "http://localhost:11434/v1",
"model": "llama3",
"supports_images": False,
},
},
}
}
})
)
mock_client = MagicMock()
mock_client.chat.completions.create.return_value = MagicMock(
choices=[MagicMock(message=MagicMock(content="fallback"))]
)
def reachable(url):
return "nowhere" not in url
with patch.object(router, "_is_reachable", side_effect=reachable), \
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client):
with (
patch.object(router, "_is_reachable", side_effect=reachable),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
result = router.complete("test")
assert result == "fallback"
def test_complete_raises_when_all_backends_exhausted():
router = _make_router({
router = _make_router(
{
"fallback_order": ["dead"],
"backends": {
"dead": {
@ -72,8 +83,9 @@ def test_complete_raises_when_all_backends_exhausted():
"model": "x",
"supports_images": False,
}
},
}
})
)
with patch.object(router, "_is_reachable", return_value=False):
with pytest.raises(RuntimeError, match="exhausted"):
router.complete("test")
@ -83,6 +95,242 @@ def test_try_cf_orch_alloc_import_path():
"""Verify lazy import points to circuitforge_orch, not circuitforge_core.resources."""
import inspect
from circuitforge_core.llm import router as router_module
src = inspect.getsource(router_module.LLMRouter._try_cf_orch_alloc)
assert "circuitforge_orch.client" in src
assert "circuitforge_core.resources.client" not in src
def test_embed_returns_vectors_from_openai_compat_backend():
router = _make_router(
{
"fallback_order": ["ollama"],
"backends": {
"ollama": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "mistral:7b",
"embedding_model": "nomic-embed-text",
"supports_images": False,
}
},
}
)
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[
MagicMock(embedding=[0.1, 0.2, 0.3]),
MagicMock(embedding=[0.4, 0.5, 0.6]),
]
)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.requests.get", return_value=MagicMock(status_code=404)),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
result = router.embed(["hello world", "fireball rules"])
assert result == [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]
mock_client.embeddings.create.assert_called_once_with(
model="nomic-embed-text",
input=["hello world", "fireball rules"],
)
def test_embed_uses_chat_model_when_no_embedding_model_configured():
router = _make_router(
{
"fallback_order": ["ollama"],
"backends": {
"ollama": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "llama3",
"supports_images": False,
}
},
}
)
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[MagicMock(embedding=[0.9, 0.8])]
)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.requests.get", return_value=MagicMock(status_code=404)),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
router.embed(["test"])
call_kwargs = mock_client.embeddings.create.call_args
assert call_kwargs.kwargs["model"] == "llama3"
def test_embed_skips_non_openai_compat_backends():
router = _make_router(
{
"fallback_order": ["anthropic", "ollama"],
"backends": {
"anthropic": {
"type": "anthropic",
"enabled": True,
"model": "claude-haiku-4-5-20251001",
"api_key_env": "ANTHROPIC_API_KEY",
"supports_images": True,
},
"ollama": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "nomic-embed-text",
"supports_images": False,
},
},
}
)
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[MagicMock(embedding=[0.1])]
)
mock_openai = MagicMock(return_value=mock_client)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.requests.get", return_value=MagicMock(status_code=404)),
patch("circuitforge_core.llm.router.OpenAI", mock_openai),
):
result = router.embed(["hello"])
assert result == [[0.1]]
# Only ollama reached the OpenAI constructor; anthropic was skipped by type check
mock_openai.assert_called_once()
def test_embed_raises_when_all_backends_exhausted():
router = _make_router(
{
"fallback_order": ["dead"],
"backends": {
"dead": {
"type": "openai_compat",
"base_url": "http://nowhere:1/v1",
"model": "x",
"supports_images": False,
}
},
}
)
with patch.object(router, "_is_reachable", return_value=False):
with pytest.raises(RuntimeError, match="exhausted"):
router.embed(["test"])
# ── #59: LLMRouter dict init ──────────────────────────────────────────────────
def test_init_accepts_inline_dict():
config = {
"fallback_order": ["local"],
"backends": {
"local": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"model": "llama3",
"supports_images": False,
}
},
}
router = LLMRouter(config)
assert router.config["fallback_order"] == ["local"]
assert "local" in router.config["backends"]
def test_init_dict_is_used_directly():
config = {"fallback_order": [], "backends": {}}
router = LLMRouter(config)
assert router.config is config
# ── #60: Ollama embedding model preflight ─────────────────────────────────────
def _ollama_backend(model: str = "nomic-embed-text") -> dict:
return {
"fallback_order": ["ollama"],
"backends": {
"ollama": {
"type": "openai_compat",
"base_url": "http://localhost:11434/v1",
"embedding_model": model,
"model": "mistral:7b",
"supports_images": False,
}
},
}
def test_embed_raises_actionable_error_when_model_not_pulled():
router = _make_router(_ollama_backend("nomic-embed-text"))
tags_resp = MagicMock(status_code=200)
tags_resp.json.return_value = {"models": [{"name": "mistral:latest"}]}
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.requests.get", return_value=tags_resp),
):
with pytest.raises(RuntimeError, match='ollama pull nomic-embed-text'):
router.embed(["hello"])
def test_embed_proceeds_when_model_is_pulled():
router = _make_router(_ollama_backend("nomic-embed-text"))
tags_resp = MagicMock(status_code=200)
tags_resp.json.return_value = {
"models": [{"name": "nomic-embed-text:latest"}, {"name": "mistral:latest"}]
}
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[MagicMock(embedding=[0.1, 0.2])]
)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.requests.get", return_value=tags_resp),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
result = router.embed(["hello"])
assert result == [[0.1, 0.2]]
def test_embed_skips_preflight_when_tags_endpoint_unavailable():
"""Non-Ollama backends (vLLM, etc.) don't expose /api/tags — check must be silent."""
router = _make_router(_ollama_backend("custom-embed"))
tags_resp = MagicMock(status_code=404)
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[MagicMock(embedding=[0.5])]
)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.requests.get", return_value=tags_resp),
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
result = router.embed(["hello"])
assert result == [[0.5]]
def test_ollama_tags_cache_is_hit_only_once():
router = _make_router(_ollama_backend("nomic-embed-text"))
tags_resp = MagicMock(status_code=200)
tags_resp.json.return_value = {"models": [{"name": "nomic-embed-text:latest"}]}
mock_client = MagicMock()
mock_client.embeddings.create.return_value = MagicMock(
data=[MagicMock(embedding=[0.1])]
)
with (
patch.object(router, "_is_reachable", return_value=True),
patch("circuitforge_core.llm.router.requests.get", return_value=tags_resp) as mock_get,
patch("circuitforge_core.llm.router.OpenAI", return_value=mock_client),
):
router.embed(["first"])
router.embed(["second"])
# /api/tags is called once (cache hit on second embed)
tags_calls = [c for c in mock_get.call_args_list if "api/tags" in str(c)]
assert len(tags_calls) == 1

View file

@ -0,0 +1,106 @@
"""Tests for CohereTextReranker with mocked cohere client."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.reranker.adapters.cohere import CohereTextReranker
def _make_cohere_result(index: int, score: float) -> MagicMock:
r = MagicMock()
r.index = index
r.relevance_score = score
return r
def _make_mock_client(results: list[MagicMock]) -> MagicMock:
response = MagicMock()
response.results = results
client = MagicMock()
client.rerank.return_value = response
return client
def test_model_id_includes_model_name():
r = CohereTextReranker(model="rerank-multilingual-v3.0")
assert r.model_id == "cohere:rerank-multilingual-v3.0"
def test_raises_without_cohere_package():
reranker = CohereTextReranker(api_key="co-test")
with patch("circuitforge_core.reranker.adapters.cohere._cohere", None):
with pytest.raises(ImportError, match="cohere"):
reranker._score_pairs("q", ["doc"])
def test_raises_without_api_key(monkeypatch):
monkeypatch.delenv("COHERE_API_KEY", raising=False)
reranker = CohereTextReranker() # no api_key arg
mock_cohere = MagicMock()
with patch("circuitforge_core.reranker.adapters.cohere._cohere", mock_cohere):
with pytest.raises(RuntimeError, match="API key"):
reranker._get_client()
def test_reads_api_key_from_env(monkeypatch):
monkeypatch.setenv("COHERE_API_KEY", "co-fromenv")
mock_cohere = MagicMock()
with patch("circuitforge_core.reranker.adapters.cohere._cohere", mock_cohere):
reranker = CohereTextReranker()
reranker._get_client()
mock_cohere.Client.assert_called_once_with(api_key="co-fromenv")
def test_score_pairs_returns_original_order():
"""Cohere returns results sorted by score; we must restore original order."""
reranker = CohereTextReranker(api_key="co-test")
# Cohere returns candidates ranked: index 2 (0.9), index 0 (0.6), index 1 (0.1)
mock_client = _make_mock_client([
_make_cohere_result(index=2, score=0.9),
_make_cohere_result(index=0, score=0.6),
_make_cohere_result(index=1, score=0.1),
])
with patch.object(reranker, "_get_client", return_value=mock_client):
scores = reranker._score_pairs("query", ["a", "b", "c"])
# Original order: a=0.6, b=0.1, c=0.9
assert scores == [pytest.approx(0.6), pytest.approx(0.1), pytest.approx(0.9)]
def test_rerank_sorts_correctly():
reranker = CohereTextReranker(api_key="co-test")
mock_client = _make_mock_client([
_make_cohere_result(index=1, score=0.95),
_make_cohere_result(index=0, score=0.3),
])
with patch.object(reranker, "_get_client", return_value=mock_client):
results = reranker.rerank("query", ["less relevant", "more relevant"])
assert results[0].candidate == "more relevant"
assert results[0].rank == 0
def test_rerank_top_n():
reranker = CohereTextReranker(api_key="co-test")
mock_client = _make_mock_client([
_make_cohere_result(index=0, score=0.9),
_make_cohere_result(index=1, score=0.5),
_make_cohere_result(index=2, score=0.1),
])
with patch.object(reranker, "_get_client", return_value=mock_client):
results = reranker.rerank("q", ["a", "b", "c"], top_n=2)
assert len(results) == 2
def test_rerank_calls_cohere_with_correct_args():
reranker = CohereTextReranker(api_key="co-test", model="rerank-english-v3.0")
mock_client = _make_mock_client([_make_cohere_result(index=0, score=0.8)])
with patch.object(reranker, "_get_client", return_value=mock_client):
reranker.rerank("my query", ["only doc"])
mock_client.rerank.assert_called_once_with(
query="my query",
documents=["only doc"],
model="rerank-english-v3.0",
top_n=1,
max_chunks_per_doc=1,
)

View file

@ -0,0 +1,77 @@
"""Tests for CrossEncoderTextReranker with mocked sentence-transformers."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.reranker.adapters.cross_encoder import CrossEncoderTextReranker
def _make_mock_cross_encoder(scores: list[float]) -> MagicMock:
m = MagicMock()
m.predict.return_value = scores
return m
def test_model_id():
assert (
CrossEncoderTextReranker("mixedbread-ai/mxbai-rerank-base-v1").model_id
== "mixedbread-ai/mxbai-rerank-base-v1"
)
def test_load_raises_without_sentence_transformers():
reranker = CrossEncoderTextReranker()
with patch("circuitforge_core.reranker.adapters.cross_encoder._CrossEncoder", None):
with pytest.raises(ImportError, match="sentence-transformers"):
reranker.load()
def test_rerank_scores_and_sorts():
reranker = CrossEncoderTextReranker("mixedbread-ai/mxbai-rerank-base-v1")
reranker._model = _make_mock_cross_encoder([0.2, 0.9, 0.5])
results = reranker.rerank("query", ["a", "b", "c"])
assert results[0].candidate == "b"
assert results[0].rank == 0
assert results[2].candidate == "a"
def test_rerank_top_n():
reranker = CrossEncoderTextReranker()
reranker._model = _make_mock_cross_encoder([0.1, 0.8, 0.5])
results = reranker.rerank("q", ["a", "b", "c"], top_n=2)
assert len(results) == 2
assert results[0].candidate == "b"
def test_predict_called_with_pairs():
reranker = CrossEncoderTextReranker()
mock_model = _make_mock_cross_encoder([0.7, 0.3])
reranker._model = mock_model
reranker.rerank("chicken soup", ["recipe one", "recipe two"])
pairs = mock_model.predict.call_args[0][0]
assert pairs == [("chicken soup", "recipe one"), ("chicken soup", "recipe two")]
def test_numpy_scores_coerced_to_float():
"""predict() may return numpy floats — verify they're converted cleanly."""
try:
import numpy as np
numpy_scores = np.array([0.8, 0.2])
except ImportError:
pytest.skip("numpy not installed")
reranker = CrossEncoderTextReranker()
reranker._model = _make_mock_cross_encoder(numpy_scores) # type: ignore[arg-type]
results = reranker.rerank("q", ["a", "b"])
assert isinstance(results[0].score, float)
def test_unload_clears_model():
reranker = CrossEncoderTextReranker()
reranker._model = MagicMock()
reranker.unload()
assert reranker._model is None

View file

@ -0,0 +1,185 @@
"""Tests for Qwen3TextReranker with mocked transformers."""
from __future__ import annotations
from unittest.mock import MagicMock, patch, PropertyMock
import pytest
from circuitforge_core.reranker.adapters.qwen3 import Qwen3TextReranker, _ASSISTANT_PREFILL
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_mock_model(yes_logit: float = 5.0, no_logit: float = 1.0, batch_size: int = 1):
"""Return a mock AutoModelForCausalLM that outputs fixed yes/no logits."""
import torch
model = MagicMock()
# Simulate logits: (batch, seq_len=1, vocab_size=32000)
# yes token id = 9693, no token id = 2201 (Qwen tokenizer typical values)
vocab_size = 32000
logits = torch.zeros(batch_size, 1, vocab_size)
logits[:, :, 9693] = yes_logit # "yes" token
logits[:, :, 2201] = no_logit # "no" token
output = MagicMock()
output.logits = logits
model.return_value = output
# next(model.parameters()).device
param = MagicMock()
param.device = torch.device("cpu")
model.parameters.return_value = iter([param])
return model
def _make_mock_tokenizer(yes_id: int = 9693, no_id: int = 2201):
"""Return a mock AutoTokenizer."""
import torch
tokenizer = MagicMock()
tokenizer.encode.side_effect = lambda text, **kw: (
[yes_id] if text == "yes" else [no_id]
)
tokenizer.apply_chat_template.return_value = "<prompt>"
tokenizer.padding_side = "right"
# Return simple fixed-length tensors from __call__
tokenizer.return_value = {
"input_ids": torch.zeros(1, 10, dtype=torch.long),
"attention_mask": torch.ones(1, 10, dtype=torch.long),
}
return tokenizer
# ── Unit tests ────────────────────────────────────────────────────────────────
def test_load_raises_without_torch():
reranker = Qwen3TextReranker("Qwen/Qwen3-Reranker-0.6B")
with patch("circuitforge_core.reranker.adapters.qwen3._torch", None):
with pytest.raises(ImportError, match="torch"):
reranker.load()
def test_load_raises_without_transformers():
reranker = Qwen3TextReranker("Qwen/Qwen3-Reranker-0.6B")
with patch("circuitforge_core.reranker.adapters.qwen3._AutoModel", None):
with pytest.raises(ImportError, match="transformers"):
reranker.load()
def test_model_id():
assert Qwen3TextReranker("Qwen/Qwen3-Reranker-1.5B").model_id == "Qwen/Qwen3-Reranker-1.5B"
def test_unload_clears_state():
reranker = Qwen3TextReranker()
reranker._model = MagicMock()
reranker._tokenizer = MagicMock()
reranker._yes_id = 1
reranker._no_id = 2
reranker.unload()
assert reranker._model is None
assert reranker._tokenizer is None
assert reranker._yes_id is None
assert reranker._no_id is None
def test_build_prompt_includes_prefill():
reranker = Qwen3TextReranker()
reranker._tokenizer = _make_mock_tokenizer()
prompt = reranker._build_prompt("what is chicken soup", "a hearty recipe")
assert _ASSISTANT_PREFILL in prompt
def test_score_batch_returns_yes_probability():
"""Higher yes_logit → score closer to 1.0."""
import torch
reranker = Qwen3TextReranker()
reranker._tokenizer = _make_mock_tokenizer()
reranker._model = _make_mock_model(yes_logit=10.0, no_logit=0.0)
reranker._yes_id = 9693
reranker._no_id = 2201
scores = reranker._score_batch("query", ["candidate"])
assert len(scores) == 1
assert scores[0] > 0.99 # softmax(10, 0)[0] ≈ 0.9999
def test_score_batch_low_yes_logit():
"""Lower yes_logit → score closer to 0.0."""
reranker = Qwen3TextReranker()
reranker._tokenizer = _make_mock_tokenizer()
reranker._model = _make_mock_model(yes_logit=0.0, no_logit=10.0)
reranker._yes_id = 9693
reranker._no_id = 2201
scores = reranker._score_batch("query", ["irrelevant candidate"])
assert scores[0] < 0.01
def test_rerank_sorts_by_score():
"""Integration through rerank() — highest yes-logit candidate should rank first."""
import torch
reranker = Qwen3TextReranker(batch_size=10)
tokenizer = _make_mock_tokenizer()
# Return different-length tensors per call to simulate multi-candidate batch
call_count = [0]
def tokenize_side_effect(prompts, **kw):
n = len(prompts)
return {
"input_ids": torch.zeros(n, 10, dtype=torch.long),
"attention_mask": torch.ones(n, 10, dtype=torch.long),
}
tokenizer.side_effect = tokenize_side_effect
tokenizer.return_value = None # disable default return
reranker._tokenizer = tokenizer
# Simulate two candidates: first gets low yes logit, second gets high
import torch as _torch
vocab_size = 32000
batch_logits = _torch.zeros(2, 1, vocab_size)
batch_logits[0, 0, 9693] = 1.0 # candidate 0: low relevance
batch_logits[0, 0, 2201] = 5.0
batch_logits[1, 0, 9693] = 5.0 # candidate 1: high relevance
batch_logits[1, 0, 2201] = 1.0
output = MagicMock()
output.logits = batch_logits
model = MagicMock()
model.return_value = output
param = MagicMock()
param.device = _torch.device("cpu")
model.parameters.return_value = iter([param])
reranker._model = model
reranker._yes_id = 9693
reranker._no_id = 2201
results = reranker.rerank("query", ["low relevance doc", "high relevance doc"])
assert results[0].candidate == "high relevance doc"
assert results[0].rank == 0
def test_score_in_batches_splits_correctly():
"""Verify that large candidate lists are split into sub-batches."""
reranker = Qwen3TextReranker(batch_size=2)
reranker._tokenizer = _make_mock_tokenizer()
reranker._yes_id = 9693
reranker._no_id = 2201
batch_results: list[list[float]] = []
def fake_score_batch(query, cands):
batch_results.append(cands)
return [0.5] * len(cands)
reranker._score_batch = fake_score_batch # type: ignore[method-assign]
scores = reranker._score_in_batches("q", ["a", "b", "c", "d", "e"])
assert len(scores) == 5
# 5 candidates with batch_size=2 → 3 sub-batches: [a,b], [c,d], [e]
assert len(batch_results) == 3
assert batch_results[2] == ["e"]

View file

@ -0,0 +1,105 @@
"""Tests for RemoteTextReranker."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.reranker.adapters.remote import RemoteTextReranker
def _make_mock_response(results: list[dict]) -> MagicMock:
resp = MagicMock()
resp.json.return_value = {"results": results}
resp.raise_for_status = MagicMock()
return resp
def test_model_id():
assert RemoteTextReranker("http://10.0.0.1:8011").model_id == "remote"
def test_score_pairs_posts_to_rerank_endpoint():
reranker = RemoteTextReranker("http://10.0.0.1:8011")
mock_resp = _make_mock_response([
{"candidate": "doc a", "score": 0.9, "rank": 0},
{"candidate": "doc b", "score": 0.3, "rank": 1},
])
with patch("requests.post", return_value=mock_resp) as mock_post:
scores = reranker._score_pairs("query", ["doc a", "doc b"])
mock_post.assert_called_once_with(
"http://10.0.0.1:8011/rerank",
json={"query": "query", "candidates": ["doc a", "doc b"], "top_n": 0},
timeout=30,
)
assert scores == [pytest.approx(0.9), pytest.approx(0.3)]
def test_score_pairs_restores_original_order():
"""Remote may return results in any order — scores must align with input."""
reranker = RemoteTextReranker("http://10.0.0.1:8011")
# Remote returned c first (highest score), then a, then b
mock_resp = _make_mock_response([
{"candidate": "c", "score": 0.95, "rank": 0},
{"candidate": "a", "score": 0.6, "rank": 1},
{"candidate": "b", "score": 0.1, "rank": 2},
])
with patch("requests.post", return_value=mock_resp):
scores = reranker._score_pairs("q", ["a", "b", "c"])
assert scores == [pytest.approx(0.6), pytest.approx(0.1), pytest.approx(0.95)]
def test_score_pairs_raises_on_http_error():
import requests as req
reranker = RemoteTextReranker("http://10.0.0.1:8011")
with patch("requests.post", side_effect=req.ConnectionError("refused")):
with pytest.raises(RuntimeError, match="Remote reranker"):
reranker._score_pairs("q", ["doc"])
def test_rerank_end_to_end():
reranker = RemoteTextReranker("http://10.0.0.1:8011")
mock_resp = _make_mock_response([
{"candidate": "irrelevant", "score": 0.2, "rank": 0},
{"candidate": "very relevant", "score": 0.9, "rank": 1},
])
with patch("requests.post", return_value=mock_resp):
results = reranker.rerank("q", ["irrelevant", "very relevant"])
assert results[0].candidate == "very relevant"
assert results[0].rank == 0
# ── make_reranker wiring ──────────────────────────────────────────────────────
def test_make_reranker_qwen3():
from circuitforge_core.reranker import make_reranker
from circuitforge_core.reranker.adapters.qwen3 import Qwen3TextReranker
r = make_reranker("Qwen/Qwen3-Reranker-0.6B", backend="qwen3")
assert isinstance(r, Qwen3TextReranker)
def test_make_reranker_cross_encoder():
from circuitforge_core.reranker import make_reranker
from circuitforge_core.reranker.adapters.cross_encoder import CrossEncoderTextReranker
r = make_reranker("mixedbread-ai/mxbai-rerank-base-v1", backend="cross-encoder")
assert isinstance(r, CrossEncoderTextReranker)
def test_make_reranker_cohere():
from circuitforge_core.reranker import make_reranker
from circuitforge_core.reranker.adapters.cohere import CohereTextReranker
r = make_reranker("rerank-english-v3.0", backend="cohere")
assert isinstance(r, CohereTextReranker)
def test_make_reranker_remote():
from circuitforge_core.reranker import make_reranker
r = make_reranker("http://10.0.0.1:8011", backend="remote")
assert isinstance(r, RemoteTextReranker)
def test_make_reranker_unknown_raises():
from circuitforge_core.reranker import make_reranker
with pytest.raises(ValueError, match="cross-encoder"):
make_reranker(backend="unknown-backend")

View file

View file

@ -0,0 +1,102 @@
"""Tests for VectorStore ABC and VectorMatch."""
from __future__ import annotations
from dataclasses import FrozenInstanceError
import pytest
from circuitforge_core.vector.base import VectorMatch, VectorStore
class _ConcreteStore(VectorStore):
"""Minimal in-memory implementation for testing the ABC contract."""
def __init__(self) -> None:
self._data: dict[str, tuple[list[float], dict]] = {}
def upsert(self, entry_id: str, vector: list[float], metadata: dict) -> None:
self._data[entry_id] = (vector, metadata)
def query(
self,
vector: list[float],
top_k: int = 10,
filter_metadata: dict | None = None,
) -> list[VectorMatch]:
results = [
VectorMatch(entry_id=k, score=0.0, metadata=v[1])
for k, v in self._data.items()
]
if filter_metadata:
results = [
r
for r in results
if all(r.metadata.get(k) == val for k, val in filter_metadata.items())
]
return results[:top_k]
def delete(self, entry_id: str) -> None:
self._data.pop(entry_id, None)
def delete_where(self, filter_metadata: dict) -> int:
to_remove = [
k
for k, (_, meta) in self._data.items()
if all(meta.get(fk) == fv for fk, fv in filter_metadata.items())
]
for k in to_remove:
del self._data[k]
return len(to_remove)
def test_vector_match_is_frozen():
match = VectorMatch(entry_id="a", score=0.1, metadata={})
with pytest.raises(FrozenInstanceError):
match.score = 0.5 # type: ignore[misc]
def test_vector_match_metadata_is_dict():
match = VectorMatch(entry_id="a", score=0.1, metadata={"k": "v"})
assert isinstance(match.metadata, dict)
assert match.metadata["k"] == "v"
def test_upsert_and_query():
store = _ConcreteStore()
store.upsert("chunk-1", [0.1, 0.2], {"doc_id": "book-a", "page": 1})
results = store.query([0.1, 0.2])
assert len(results) == 1
assert results[0].entry_id == "chunk-1"
assert results[0].metadata["page"] == 1
def test_query_filter_metadata():
store = _ConcreteStore()
store.upsert("c1", [0.1], {"doc_id": "book-a"})
store.upsert("c2", [0.2], {"doc_id": "book-b"})
results = store.query([0.1], filter_metadata={"doc_id": "book-a"})
assert len(results) == 1
assert results[0].entry_id == "c1"
def test_delete():
store = _ConcreteStore()
store.upsert("x", [0.1], {})
store.delete("x")
assert store.query([0.1]) == []
def test_delete_where():
store = _ConcreteStore()
store.upsert("c1", [0.1], {"doc_id": "book-a"})
store.upsert("c2", [0.2], {"doc_id": "book-a"})
store.upsert("c3", [0.3], {"doc_id": "book-b"})
count = store.delete_where({"doc_id": "book-a"})
assert count == 2
assert len(store.query([0.1])) == 1
def test_cannot_instantiate_abc_directly():
with pytest.raises(TypeError):
VectorStore() # type: ignore[abstract]

View file

@ -0,0 +1,82 @@
# tests/test_vector/test_sqlite_vec.py
"""Integration tests for LocalSQLiteVecStore (uses a real in-memory sqlite-vec DB)."""
from __future__ import annotations
import pytest
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore
DIMS = 4 # small dimension for tests
@pytest.fixture
def store(tmp_path) -> LocalSQLiteVecStore:
return LocalSQLiteVecStore(db_path=tmp_path / "vecs.db", dimensions=DIMS)
def _vec(val: float) -> list[float]:
return [val] * DIMS
def test_upsert_and_query_returns_match(store):
store.upsert("doc-1::p1", _vec(0.1), {"doc_id": "doc-1", "page": 1})
results = store.query(_vec(0.1), top_k=5)
assert len(results) == 1
assert results[0].entry_id == "doc-1::p1"
assert results[0].metadata["page"] == 1
def test_upsert_replaces_existing(store):
store.upsert("chunk-1", _vec(0.1), {"page": 1})
store.upsert("chunk-1", _vec(0.9), {"page": 99})
# Metadata check
results = store.query(_vec(0.9), top_k=5)
assert results[0].metadata["page"] == 99
# Vector check: querying with new vector should score better than querying with old
old_results = store.query(_vec(0.1), top_k=5)
new_results = store.query(_vec(0.9), top_k=5)
assert new_results[0].score < old_results[0].score
def test_query_respects_top_k(store):
for i in range(5):
store.upsert(f"chunk-{i}", _vec(float(i) * 0.1), {"i": i})
results = store.query(_vec(0.0), top_k=2)
assert len(results) == 2
def test_filter_metadata(store):
store.upsert("c1", _vec(0.1), {"doc_id": "book-a"})
store.upsert("c2", _vec(0.2), {"doc_id": "book-b"})
results = store.query(_vec(0.1), filter_metadata={"doc_id": "book-a"})
assert all(r.metadata["doc_id"] == "book-a" for r in results)
def test_delete(store):
store.upsert("x", _vec(0.5), {})
store.delete("x")
assert store.query(_vec(0.5)) == []
def test_delete_where(store):
store.upsert("c1", _vec(0.1), {"doc_id": "book-a"})
store.upsert("c2", _vec(0.2), {"doc_id": "book-a"})
store.upsert("c3", _vec(0.3), {"doc_id": "book-b"})
count = store.delete_where({"doc_id": "book-a"})
assert count == 2
assert len(store.query(_vec(0.1))) == 1
def test_delete_nonexistent_is_noop(store):
store.delete("does-not-exist") # should not raise
def test_empty_query_returns_empty(store):
assert store.query(_vec(0.1)) == []
def test_delete_where_raises_on_empty_filter(store):
store.upsert("c1", _vec(0.1), {"doc_id": "book-a"})
with pytest.raises(ValueError, match="empty"):
store.delete_where({})

View file

View file

@ -0,0 +1,236 @@
"""
Tests for the cf-video FastAPI app using mock backend.
Tests run without GPU, torch, or a real video file.
MockVideoBackend checks os.path.exists() but never reads video content,
so a zero-byte placeholder is sufficient.
"""
from __future__ import annotations
import pytest
from fastapi.testclient import TestClient
import circuitforge_core.video.app as video_app
from circuitforge_core.video.backends.mock import MockVideoBackend
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def inject_mock_backend():
"""Replace global backend with mock before each test; restore after."""
original = video_app._backend
video_app._backend = MockVideoBackend()
yield
video_app._backend = original
@pytest.fixture()
def client():
return TestClient(video_app.app)
@pytest.fixture()
def video_file(tmp_path):
"""Placeholder file that satisfies os.path.exists() inside the mock."""
p = tmp_path / "sample.mp4"
p.write_bytes(b"\x00" * 16)
return str(p)
# ── /health ───────────────────────────────────────────────────────────────────
def test_health_returns_ok(client):
resp = client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["model"] == "mock"
assert data["vram_mb"] == 0
def test_health_503_when_no_backend(client):
video_app._backend = None
resp = client.get("/health")
assert resp.status_code == 503
# ── /caption ──────────────────────────────────────────────────────────────────
def test_caption_returns_200(client, video_file):
resp = client.post("/caption", json={"video_path": video_file})
assert resp.status_code == 200
def test_caption_response_has_scene(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
assert isinstance(data["scene"], str)
assert data["scene"]
def test_caption_response_has_events(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
assert isinstance(data["events"], list)
assert len(data["events"]) >= 1
def test_caption_events_have_timestamps(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
for ev in data["events"]:
assert "start" in ev
assert "end" in ev
assert "description" in ev
assert ev["start"] <= ev["end"]
def test_caption_response_has_caption(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
assert isinstance(data["caption"], str)
assert data["caption"]
def test_caption_response_model_field(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
assert isinstance(data["model"], str)
def test_caption_404_on_missing_file(client):
resp = client.post("/caption", json={"video_path": "/no/such/file.mp4"})
assert resp.status_code == 404
def test_caption_503_when_no_backend(client, video_file):
video_app._backend = None
resp = client.post("/caption", json={"video_path": video_file})
assert resp.status_code == 503
def test_caption_custom_max_new_tokens(client, video_file):
resp = client.post(
"/caption",
json={"video_path": video_file, "max_new_tokens": 512},
)
assert resp.status_code == 200
def test_caption_rejects_max_new_tokens_below_min(client, video_file):
resp = client.post(
"/caption",
json={"video_path": video_file, "max_new_tokens": 10},
)
assert resp.status_code == 422
def test_caption_rejects_max_new_tokens_above_max(client, video_file):
resp = client.post(
"/caption",
json={"video_path": video_file, "max_new_tokens": 99999},
)
assert resp.status_code == 422
# ── /find ─────────────────────────────────────────────────────────────────────
def test_find_returns_200(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": "someone waves"},
)
assert resp.status_code == 200
def test_find_response_has_span(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
# MockVideoBackend always returns a non-null span
assert data["span"] is not None
assert len(data["span"]) == 2
assert data["span"][0] <= data["span"][1]
def test_find_span_is_list_of_floats(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
span = data["span"]
assert all(isinstance(v, float) for v in span)
def test_find_format_ok_field(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
assert data["format_ok"] is True
def test_find_raw_field(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
assert isinstance(data["raw"], str)
def test_find_model_field(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
assert isinstance(data["model"], str)
def test_find_404_on_missing_file(client):
resp = client.post(
"/find",
json={"video_path": "/no/such/file.mp4", "event": "wave"},
)
assert resp.status_code == 404
def test_find_503_when_no_backend(client, video_file):
video_app._backend = None
resp = client.post(
"/find",
json={"video_path": video_file, "event": "wave"},
)
assert resp.status_code == 503
def test_find_rejects_empty_event(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": ""},
)
assert resp.status_code == 422
def test_find_custom_max_new_tokens(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": "wave", "max_new_tokens": 128},
)
assert resp.status_code == 200
def test_find_rejects_max_new_tokens_below_min(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": "wave", "max_new_tokens": 10},
)
assert resp.status_code == 422
def test_find_rejects_max_new_tokens_above_max(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": "wave", "max_new_tokens": 99999},
)
assert resp.status_code == 422

View file

@ -0,0 +1,157 @@
"""
Tests for MockVideoBackend and the VideoBackend protocol.
All tests run without a GPU, torch install, or any real video file
(MockVideoBackend only checks os.path.exists, not video validity).
"""
from __future__ import annotations
import os
import tempfile
import pytest
from circuitforge_core.video.backends.base import (
CaptionResult,
FindResult,
VideoBackend,
VideoEvent,
make_video_backend,
)
from circuitforge_core.video.backends.mock import MockVideoBackend
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture()
def video_file(tmp_path):
"""Create a temporary file that satisfies os.path.exists() checks."""
p = tmp_path / "test.mp4"
p.write_bytes(b"\x00" * 16) # placeholder bytes; mock never reads content
return str(p)
# ── Protocol conformance ──────────────────────────────────────────────────────
def test_mock_satisfies_protocol():
backend = MockVideoBackend()
assert isinstance(backend, VideoBackend)
def test_mock_model_name_default():
assert MockVideoBackend().model_name == "mock"
def test_mock_model_name_custom():
assert MockVideoBackend(model_path="custom-path").model_name == "custom-path"
def test_mock_vram_mb():
assert MockVideoBackend().vram_mb == 0
# ── caption() ─────────────────────────────────────────────────────────────────
def test_caption_returns_caption_result(video_file):
result = MockVideoBackend().caption(video_file)
assert isinstance(result, CaptionResult)
def test_caption_scene_is_str(video_file):
result = MockVideoBackend().caption(video_file)
assert isinstance(result.scene, str)
assert result.scene # non-empty
def test_caption_events_are_video_events(video_file):
result = MockVideoBackend().caption(video_file)
assert isinstance(result.events, list)
for ev in result.events:
assert isinstance(ev, VideoEvent)
def test_caption_events_have_numeric_timestamps(video_file):
result = MockVideoBackend().caption(video_file)
for ev in result.events:
assert isinstance(ev.start, float)
assert isinstance(ev.end, float)
assert ev.start <= ev.end
def test_caption_caption_str(video_file):
result = MockVideoBackend().caption(video_file)
assert isinstance(result.caption, str)
assert result.caption
def test_caption_model_matches_path(video_file):
result = MockVideoBackend(model_path="test-model").caption(video_file)
assert result.model == "test-model"
def test_caption_raises_on_missing_file():
with pytest.raises(FileNotFoundError):
MockVideoBackend().caption("/nonexistent/video.mp4")
def test_caption_max_new_tokens_accepted(video_file):
"""max_new_tokens kwarg must be accepted without error."""
result = MockVideoBackend().caption(video_file, max_new_tokens=512)
assert isinstance(result, CaptionResult)
# ── find() ────────────────────────────────────────────────────────────────────
def test_find_returns_find_result(video_file):
result = MockVideoBackend().find(video_file, "someone waves")
assert isinstance(result, FindResult)
def test_find_span_is_tuple_or_none(video_file):
result = MockVideoBackend().find(video_file, "mock event")
# MockVideoBackend always returns a span
assert result.span is not None
assert len(result.span) == 2
assert result.span[0] <= result.span[1]
def test_find_format_ok_true(video_file):
result = MockVideoBackend().find(video_file, "mock event")
assert result.format_ok is True
def test_find_raw_is_str(video_file):
result = MockVideoBackend().find(video_file, "mock event")
assert isinstance(result.raw, str)
def test_find_model_matches_path(video_file):
result = MockVideoBackend(model_path="my-model").find(video_file, "event")
assert result.model == "my-model"
def test_find_raises_on_missing_file():
with pytest.raises(FileNotFoundError):
MockVideoBackend().find("/nonexistent/video.mp4", "event")
def test_find_max_new_tokens_accepted(video_file):
result = MockVideoBackend().find(video_file, "event", max_new_tokens=128)
assert isinstance(result, FindResult)
# ── make_video_backend factory ────────────────────────────────────────────────
def test_factory_returns_mock_when_flag_set():
backend = make_video_backend(model_path="mock", mock=True)
assert isinstance(backend, MockVideoBackend)
def test_factory_mock_uses_model_path():
backend = make_video_backend(model_path="some-path", mock=True)
assert backend.model_name == "some-path"