Compare commits

...

7 commits

Author SHA1 Message Date
5a363f3b6c fix(video): add torchvision to video-marlin extras
Some checks are pending
CI / test (push) Waiting to run
Mirror / mirror (push) Waiting to run
Missing from initial extras list — required by QwenVLVideoProcessor
at inference time. On CUDA 13 nodes must be installed from the PyTorch
nightly cu130 index to avoid a torch version downgrade:
  pip install --index-url https://download.pytorch.org/whl/nightly/cu130 torch torchvision

Discovered during Muninn deployment (2026-05-26).
2026-06-02 20:32:03 -07:00
a7d916f630 docs: add LLM development disclosure to README
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
Humans own design, architecture, code review, testing, and
verification. LLMs are part of our development workflow.
Links to circuitforge.tech/positions for our full position.
2026-05-28 08:20:17 -07:00
c2ac55259d fix(video): enforce PCI_BUS_ID order + force CUDA_VISIBLE_DEVICES assignment
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
CUDA defaults to FASTEST_FIRST device ordering, which does not match
nvidia-smi's PCI bus order on multi-GPU nodes. On Muninn, the RTX 3090
is cuda:0 and the Quadro RTX 4000 is cuda:1 — the opposite of nvidia-smi.

Two fixes:
1. Set CUDA_DEVICE_ORDER=PCI_BUS_ID so --gpu-id always matches nvidia-smi
   and the muninn.yaml profile GPU index assignments.
2. Use direct assignment (os.environ[...] = ...) instead of setdefault —
   setdefault silently no-ops if CUDA_VISIBLE_DEVICES is already present
   in the environment (conda activation, prior run, system default).
2026-05-26 15:07:30 -07:00
9f7fb45071 feat(video): add cf-video module — Marlin-2B FastAPI service + mock backend + tests
Some checks are pending
CI / test (push) Waiting to run
Mirror / mirror (push) Waiting to run
Add the circuitforge_core.video package implementing the cf-video inference
service managed by cf-orch.

Service endpoints:
  GET  /health     — liveness check; model name + VRAM
  POST /caption    — dense scene description + timestamped event list
  POST /find       — temporal grounding of a natural-language event query

Backend hierarchy:
  VideoBackend (Protocol)
    MarlinBackend  — NemoStation/Marlin-2B via transformers>=5.7.0
    MockVideoBackend — deterministic stub; no GPU required

Pydantic request/response models enforce parameter bounds at the API
boundary (max_new_tokens ge/le, event min_length=1).  Span is serialized
as list[float] | None for JSON compatibility.

MarlinBackend loads eagerly in __init__ so cf-orch's 2-second liveness
poll catches load failures immediately.  FORCE_QWENVL_VIDEO_READER env var
defaults to torchcodec (faster than av path) before transformers import.

pyproject.toml extras:
  video-marlin   — torch, transformers, torchcodec, qwen-vl-utils, av, Pillow
  video-service  — video-marlin + fastapi + uvicorn

Test coverage: 46 tests across test_mock_backend.py and test_app.py.
All passing without GPU or real video file.

Closes: #71
2026-05-25 20:00:37 -07:00
93d36346c1 feat(llm): task-based cf-orch allocation in LLMRouter (v0.21.0)
_try_cf_orch_alloc now checks for cf_orch.task + cf_orch.product keys.
When present, uses client.task_allocate(product, task) instead of
service-based allocate(). Supports peregrine#115 task-model routing.
Existing service-based configs are unaffected.
2026-05-17 19:59:48 -07:00
af66877b51 feat(community): recipe dedup support — similar_to_ref FK, search_similar_posts, migration 006
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
Adds three-layer dedup infrastructure for community recipe posts:
- Migration 006: similar_to_ref self-FK, title lower() index, recipe_id index
- CommunityPost.similar_to_ref optional field (frozen dataclass, defaults None)
- SharedStore.search_similar_posts(): title ILIKE + recipe_id match, ordered by relevance
- insert_post() wires similar_to_ref into the INSERT
2026-05-11 17:09:18 -07:00
41c9830281 docs(readme): landing page rewrite — v0.20.0, all 28 modules, LLM router + DB usage examples, full extras install table, used-by table
Some checks failed
CI / test (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
2026-05-06 08:51:54 -07:00
15 changed files with 1248 additions and 31 deletions

193
README.md
View file

@ -1,37 +1,186 @@
# circuitforge-core
<p align="center">
<img src="docs/cf-logo.png" alt="CircuitForge logo" width="120" />
</p>
Shared scaffold for CircuitForge products.
<h1 align="center">circuitforge-core</h1>
**Current version: 0.7.0**
<p align="center">Shared Python scaffold for privacy-first, self-hosted AI tools</p>
## Modules
<p align="center">
<a href="LICENSE"><img src="https://img.shields.io/badge/license-MIT-green.svg" alt="MIT License" /></a>
<img src="https://img.shields.io/badge/version-0.20.0-blue.svg" alt="v0.20.0" />
<img src="https://img.shields.io/badge/python-3.11%2B-blue.svg" alt="Python 3.11+" />
<a href="https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core"><img src="https://img.shields.io/badge/repo-Forgejo-orange.svg" alt="Forgejo" /></a>
</p>
### Implemented
---
- `circuitforge_core.db` — SQLite connection factory and migration runner
- `circuitforge_core.llm` — LLM router with fallback chain (Ollama, vLLM, Anthropic, OpenAI-compatible)
- `circuitforge_core.tiers` — Tier system with BYOK and local vision unlocks
- `circuitforge_core.config` — Env validation and .env loader
- `circuitforge_core.hardware` — Hardware detection and LLM backend profile generation (VRAM tiers, GPU/CPU auto-select)
- `circuitforge_core.documents` — Document ingestion pipeline: PDF, DOCX, and image OCR → `StructuredDocument`
- `circuitforge_core.affiliates` — Affiliate URL wrapping with opt-out, BYOK user IDs, and CF env-var fallback (`wrap_url`)
- `circuitforge_core.preferences` — User preference store (local YAML file, pluggable backend); dot-path get/set API
- `circuitforge_core.tasks` — VRAM-aware LLM task scheduler; shared slot manager across services (`TaskScheduler`)
- `circuitforge_core.manage` — Cross-platform product process manager (Docker and native modes)
- `circuitforge_core.resources` — Resource coordinator and agent: VRAM allocation, eviction engine, GPU profile registry
## Why circuitforge-core?
### Stubs (in-tree, not yet implemented)
- **Local inference first.** The LLM router defaults to Ollama on localhost. Cloud APIs are a configurable fallback, not the default path. No telemetry, no round-trips you didn't ask for.
- **VRAM-aware scheduling.** The task scheduler and resource coordinator track GPU memory across concurrent services, allocate slots before loading models, and evict backends gracefully when VRAM is scarce.
- **Consistent tier system across products.** One `tiers` module handles Free / Paid / Premium / Ultra tiers, BYOK (bring your own key) unlocks, and local-vision capability gates — the same way in every product.
- **Uniform developer experience.** DB migrations, config validation, document ingestion, process management, and preference storage all share a single, tested implementation. Products extend, not reimplement.
- `circuitforge_core.vision` — Vision router base class (planned: moondream2 / Claude vision dispatch)
- `circuitforge_core.wizard` — First-run wizard base class (products subclass `BaseWizard`)
- `circuitforge_core.pipeline` — Staging queue base (`StagingDB`; products provide concrete schema)
---
## Install
```bash
pip install -e .
# From PyPI
pip install circuitforge-core
# Editable install from source (recommended for product development)
pip install -e /path/to/circuitforge-core
# With optional extras
pip install circuitforge-core[pdf] # PDF/DOCX/OCR document ingestion
pip install circuitforge-core[vector] # SQLite-vec vector store
pip install circuitforge-core[text-transformers] # Local transformer inference (cf-text)
pip install circuitforge-core[stt-faster-whisper] # Speech-to-text via Faster Whisper
pip install circuitforge-core[tts-chatterbox] # Text-to-speech via Chatterbox
pip install circuitforge-core[reranker-qwen3] # Reranking via Qwen3
pip install circuitforge-core[community] # PostgreSQL-backed community store
pip install circuitforge-core[manage] # cf-manage CLI (Typer)
pip install circuitforge-core[dev] # All dev dependencies
```
---
## Modules
| Module | Status | Description |
|---|---|---|
| `db` | Implemented | SQLite connection factory and migration runner |
| `llm` | Implemented | LLM router with priority fallback chain (Ollama, vLLM, Anthropic, OpenAI-compatible) |
| `tiers` | Implemented | Tier system with BYOK and local-vision unlocks (Free / Paid / Premium / Ultra) |
| `config` | Implemented | Env validation and `.env` loader with startup fail-fast |
| `hardware` | Implemented | GPU/CPU detection, VRAM profiling, backend profile generation |
| `documents` | Implemented | PDF, DOCX, and image OCR ingestion into `StructuredDocument` |
| `affiliates` | Implemented | Affiliate URL wrapping with per-user opt-out and env-var fallback |
| `preferences` | Implemented | User preference store — local YAML with pluggable backend; dot-path get/set |
| `tasks` | Implemented | VRAM-aware LLM task scheduler; shared slot manager across services |
| `manage` | Implemented | Cross-platform product process manager (Docker and native modes) |
| `resources` | Implemented | VRAM allocation, eviction engine, GPU profile registry |
| `text` | Implemented | Text processing utilities and local transformer inference service |
| `activitypub` | Implemented | ActivityPub actor, inbox, delivery, and Lemmy federation primitives |
| `audio` | Implemented | Audio buffer, format conversion, resampling, and VAD (voice activity detection) gate |
| `stt` | Implemented | Speech-to-text service (Faster Whisper backend) |
| `tts` | Implemented | Text-to-speech service (Chatterbox backend) |
| `musicgen` | Implemented | Music generation service (AudioCraft/MusicGen backend) |
| `reranker` | Implemented | Result reranking — BGE, Qwen3, cross-encoder, and Cohere adapters |
| `vector` | Implemented | SQLite-vec vector store with pluggable embedding backend |
| `api` | Implemented | Shared API helpers — corrections and feedback endpoints |
| `community` | Implemented | Community feed and social store (PostgreSQL-backed) |
| `platforms` | Implemented | Platform-specific integrations (eBay) |
| `cloud_session` | Implemented | Cloud session management primitives |
| `input` | Implemented | Input handling — MediaPipe gesture recognition |
| `job_quality` | Implemented | Job listing quality scoring and signal extraction |
| `vision` | Stub | Vision router (moondream2 / SigLIP dispatch — planned) |
| `wizard` | Stub | First-run wizard base class — products subclass `BaseWizard` |
| `pipeline` | Stub | Staging queue base — products provide concrete schema |
---
## Usage: LLM Router
The LLM router reads a config file at `~/.config/circuitforge/llm.yaml`, tries each backend in fallback order, and skips unreachable or disabled entries transparently.
```python
from circuitforge_core.llm import LLMRouter
# Auto-detects from env vars when llm.yaml is absent:
# ANTHROPIC_API_KEY, OPENAI_API_KEY / OPENAI_BASE_URL, OLLAMA_HOST
router = LLMRouter()
response = router.complete(
messages=[{"role": "user", "content": "Summarize this in one sentence."}],
system="You are a concise assistant.",
)
print(response)
```
**Example `llm.yaml`** (Ollama local, Anthropic cloud fallback):
```yaml
fallback_order:
- ollama
- anthropic
backends:
ollama:
type: openai_compat
enabled: true
base_url: http://localhost:11434/v1
model: llama3.2:3b
anthropic:
type: anthropic
enabled: true
model: claude-haiku-4-5-20251001
api_key_env: ANTHROPIC_API_KEY
supports_images: true
```
---
## Usage: Database + Migrations
```python
from circuitforge_core.db import get_connection, run_migrations
from pathlib import Path
# Run product migrations on startup
run_migrations(db_path=Path("data/app.db"), migrations_dir=Path("db/migrations"))
# Get a connection anywhere in your app
with get_connection(Path("data/app.db")) as conn:
conn.execute("INSERT INTO items (name) VALUES (?)", ("example",))
```
---
## Used by
| Product | Description |
|---|---|
| [peregrine](https://git.opensourcesolarpunk.com/Circuit-Forge/peregrine) | Job search — discovery, cover letters, interview prep |
| [snipe](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe) | Auction sniping — eBay trust scoring, bid timing |
| [kiwi](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi) | Pantry tracker with barcode/receipt OCR and recipe suggestions |
| [avocet](https://git.opensourcesolarpunk.com/Circuit-Forge/avocet) | Email classifier training and benchmark harness |
| [osprey](https://git.opensourcesolarpunk.com/Circuit-Forge/osprey) | Government hold-line automation |
| [linnet](https://git.opensourcesolarpunk.com/Circuit-Forge/linnet) | Real-time tone annotation and voice transcription |
| pagepiper | PDF/rulebook RAG (retrieval-augmented generation) search |
---
## Contributing
circuitforge-core is MIT licensed. Contributions are welcome.
```bash
git clone https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core
cd circuitforge-core
pip install -e ".[dev]"
pytest
```
- New modules belong in `circuitforge_core/<module>/` as a package, not a flat file
- Keep modules focused — extract when a module exceeds 400 lines
- All public functions need type annotations
- Tests live in `tests/` — aim for 80% coverage on new code
- Use `ruff` for linting before submitting a PR
Open issues and PRs at: [git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core](https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core)
---
## License
BSL 1.1 — see LICENSE
MIT — see [LICENSE](LICENSE).
This is the fully open layer of the CircuitForge stack. Products built on top of circuitforge-core may carry different licenses (BSL 1.1 for AI features, proprietary for fine-tuned weights). The scaffold itself is and will remain MIT.
---
Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)

View file

@ -0,0 +1,22 @@
-- 006_community_dedup.sql
-- Adds variation-linking and title search support for community recipe dedup.
-- Applies to: cf_community PostgreSQL database.
-- BSL boundary: MIT (data layer, no inference).
-- Nullable self-referential FK: user-declared "this is a variation of X"
ALTER TABLE community_posts
ADD COLUMN IF NOT EXISTS similar_to_ref TEXT REFERENCES community_posts(slug) ON DELETE SET NULL;
-- Index for variation lookup (find all variations of a parent post)
CREATE INDEX IF NOT EXISTS idx_community_posts_similar_ref
ON community_posts (similar_to_ref)
WHERE similar_to_ref IS NOT NULL;
-- Index to speed up title ILIKE prefix and substring searches
CREATE INDEX IF NOT EXISTS idx_community_posts_title_lower
ON community_posts (lower(title));
-- Index on recipe_id for exact-recipe duplicate detection
CREATE INDEX IF NOT EXISTS idx_community_posts_recipe_id
ON community_posts (recipe_id)
WHERE recipe_id IS NOT NULL;

View file

@ -66,6 +66,9 @@ class CommunityPost:
protein_pct: float | None
moisture_pct: float | None
# Variation link: slug of the parent post this is explicitly a variation of
similar_to_ref: str | None = None
def __post_init__(self) -> None:
# Coerce list fields to tuples (frozen dataclass: use object.__setattr__)
for key in ("slots", "dietary_tags", "allergen_flags", "flavor_molecules"):

View file

@ -46,6 +46,7 @@ def _row_to_post(row: dict) -> CommunityPost:
fat_pct=row.get("fat_pct"),
protein_pct=row.get("protein_pct"),
moisture_pct=row.get("moisture_pct"),
similar_to_ref=row.get("similar_to_ref"),
)
@ -137,6 +138,61 @@ class SharedStore:
finally:
self._db.putconn(conn)
def search_similar_posts(
self,
title: str,
recipe_id: int | None = None,
post_type: str | None = None,
limit: int = 8,
) -> list[CommunityPost]:
"""Return posts similar to the given title or with the same recipe_id.
Used by the dedup check before a new post is submitted. Matches on:
- exact recipe_id (strongest signal)
- case-insensitive title substring match
Results are ordered: recipe_id matches first, then by published desc.
"""
conn = self._db.getconn()
try:
conditions: list[str] = []
params: list = []
title_condition = "lower(title) LIKE lower(%s)"
title_param = f"%{title.lower()[:80]}%"
if recipe_id is not None:
conditions.append(f"(recipe_id = %s OR {title_condition})")
params.extend([recipe_id, title_param])
else:
conditions.append(title_condition)
params.append(title_param)
if post_type:
conditions.append("post_type = %s")
params.append(post_type)
where = "WHERE " + " AND ".join(conditions)
params.append(limit)
order_clause = (
"ORDER BY (recipe_id = %s) DESC, published DESC"
if recipe_id is not None
else "ORDER BY published DESC"
)
if recipe_id is not None:
params.insert(-1, recipe_id)
with conn.cursor() as cur:
cur.execute(
f"SELECT * FROM community_posts {where} {order_clause} LIMIT %s",
params,
)
rows = cur.fetchall()
return [_row_to_post(_cursor_to_dict(cur, r)) for r in rows]
finally:
self._db.putconn(conn)
# ------------------------------------------------------------------
# Writes
# ------------------------------------------------------------------
@ -156,13 +212,14 @@ class SharedStore:
seasoning_score, richness_score, brightness_score,
depth_score, aroma_score, structure_score, texture_profile,
dietary_tags, allergen_flags, flavor_molecules,
fat_pct, protein_pct, moisture_pct, source_product
fat_pct, protein_pct, moisture_pct, source_product,
similar_to_ref
) VALUES (
%s, %s, %s, %s, %s, %s, %s,
%s::jsonb, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s,
%s::jsonb, %s::jsonb, %s::jsonb,
%s, %s, %s, %s
%s, %s, %s, %s, %s
)
""",
(
@ -178,6 +235,7 @@ class SharedStore:
json.dumps(list(post.flavor_molecules)),
post.fat_pct, post.protein_pct, post.moisture_pct,
self._source_product,
post.similar_to_ref,
),
)
conn.commit()

View file

@ -190,6 +190,14 @@ class LLMRouter:
"""
If backend config has a cf_orch block and CF_ORCH_URL is set (env takes
precedence over yaml url), allocate via cf-orch and return (ctx, alloc).
Two allocation modes:
- task-based (preferred): cf_orch block has `product` + `task` keys.
Calls POST /api/inference/task; coordinator resolves model/node from
assignments.yaml. No hardcoded model IDs in product config.
- service-based (legacy): cf_orch block has `service` key.
Calls allocate(service=...) directly.
Returns None if not configured or allocation fails.
Caller MUST call ctx.__exit__(None, None, None) in a finally block.
"""
@ -205,16 +213,22 @@ class LLMRouter:
from circuitforge_orch.client import CFOrchClient
client = CFOrchClient(orch_url)
service = orch_cfg.get("service", "vllm")
candidates = orch_cfg.get("model_candidates", [])
ttl_s = float(orch_cfg.get("ttl_s", 3600.0))
# CF_APP_NAME identifies the calling product (kiwi, peregrine, etc.)
# in coordinator analytics — set in each product's .env.
# Task-based allocation: product+task → coordinator resolves model/node.
task = orch_cfg.get("task")
product = orch_cfg.get("product") or os.environ.get("CF_APP_NAME") or None
if task and product:
ctx = client.task_allocate(product, task, ttl_s=ttl_s)
alloc = ctx.__enter__()
return (ctx, alloc)
# Service-based allocation (legacy path).
cf_app = os.environ.get("CF_APP_NAME") or None
caller = f"{cf_app}.llm-router" if cf_app else "llm-router"
ctx = client.allocate(
service,
model_candidates=candidates,
orch_cfg.get("service", "vllm"),
model_candidates=orch_cfg.get("model_candidates", []),
ttl_s=ttl_s,
caller=caller,
pipeline=cf_app,

View file

@ -0,0 +1,11 @@
"""
circuitforge_core.video cf-video service: video VLM inference via Marlin-2B.
Exposes a FastAPI process (managed by cf-orch) with endpoints:
GET /health {"status": "ok", "model": str, "vram_mb": int}
POST /caption CaptionResult (scene description + timestamped events)
POST /find FindResult (temporal grounding span for a natural-language event)
Run as:
python -m circuitforge_core.video.app --model /path/to/NemoStation--Marlin-2B --port 8016 --gpu-id 0
"""

View file

@ -0,0 +1,191 @@
"""
cf-video FastAPI service managed by cf-orch.
Endpoints:
GET /health {"status": "ok", "model": str, "vram_mb": int}
POST /caption CaptionResponse (scene + timestamped events)
POST /find FindResponse (temporal grounding span)
Usage:
python -m circuitforge_core.video.app \
--model /Library/Assets/LLM/cf-video/models/NemoStation--Marlin-2B \
--port 8016 \
--gpu-id 0
The service loads the model once at startup and blocks until it is ready.
cf-orch health-polls /health before routing any inference requests.
Model requirements:
transformers >= 5.7.0
torch >= 2.11.0
torchcodec (installed)
qwen-vl-utils >= 0.0.14 (installed)
Security:
Marlin requires trust_remote_code=True. Review the model's
modeling_marlin.py before deploying on a production node.
"""
from __future__ import annotations
import argparse
import logging
import os
from typing import Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from circuitforge_core.video.backends.base import VideoBackend, make_video_backend
app = FastAPI(title="cf-video", version="0.1.0")
_backend: VideoBackend | None = None
# ── Request / response models ─────────────────────────────────────────────────
class CaptionRequest(BaseModel):
video_path: str = Field(..., description="Absolute path to the video file on this node")
max_new_tokens: int = Field(2048, ge=64, le=8192)
class VideoEventOut(BaseModel):
start: float
end: float
description: str
class CaptionResponse(BaseModel):
scene: str
events: list[VideoEventOut]
caption: str
model: str
class FindRequest(BaseModel):
video_path: str = Field(..., description="Absolute path to the video file on this node")
event: str = Field(..., min_length=1, description="Natural-language event description to locate")
max_new_tokens: int = Field(256, ge=32, le=2048)
class FindResponse(BaseModel):
span: list[float] | None = Field(
None,
description="[start_sec, end_sec] or null when the model could not ground the event",
)
format_ok: bool
raw: str
model: str
# ── Endpoints ─────────────────────────────────────────────────────────────────
@app.get("/health")
def health() -> dict[str, Any]:
if _backend is None:
raise HTTPException(503, detail="backend not initialised")
return {
"status": "ok",
"model": _backend.model_name,
"vram_mb": _backend.vram_mb,
}
@app.post("/caption", response_model=CaptionResponse)
def caption(req: CaptionRequest) -> CaptionResponse:
if _backend is None:
raise HTTPException(503, detail="backend not initialised")
try:
result = _backend.caption(req.video_path, max_new_tokens=req.max_new_tokens)
except FileNotFoundError as exc:
raise HTTPException(404, detail=str(exc)) from exc
except Exception as exc:
logging.exception("caption failed for %r", req.video_path)
raise HTTPException(500, detail=str(exc)) from exc
return CaptionResponse(
scene=result.scene,
events=[
VideoEventOut(start=ev.start, end=ev.end, description=ev.description)
for ev in result.events
],
caption=result.caption,
model=result.model,
)
@app.post("/find", response_model=FindResponse)
def find(req: FindRequest) -> FindResponse:
if _backend is None:
raise HTTPException(503, detail="backend not initialised")
try:
result = _backend.find(
req.video_path,
req.event,
max_new_tokens=req.max_new_tokens,
)
except FileNotFoundError as exc:
raise HTTPException(404, detail=str(exc)) from exc
except ValueError as exc:
raise HTTPException(422, detail=str(exc)) from exc
except Exception as exc:
logging.exception("find failed for %r event=%r", req.video_path, req.event)
raise HTTPException(500, detail=str(exc)) from exc
return FindResponse(
span=list(result.span) if result.span is not None else None,
format_ok=result.format_ok,
raw=result.raw,
model=result.model,
)
# ── CLI entry point ───────────────────────────────────────────────────────────
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="cf-video service (Marlin-2B)")
p.add_argument(
"--model",
required=True,
help="Local filesystem path to the Marlin model directory (safetensors)",
)
p.add_argument("--port", type=int, default=8016)
p.add_argument("--host", default="0.0.0.0")
p.add_argument(
"--gpu-id", type=int, default=0,
help="CUDA device index; overridden by CUDA_VISIBLE_DEVICES when set by cf-orch",
)
p.add_argument("--device", default="cuda", choices=["cuda", "cpu"])
p.add_argument(
"--mock", action="store_true",
help="Run with MockVideoBackend (no GPU, for testing)",
)
return p.parse_args()
if __name__ == "__main__":
import uvicorn
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s%(message)s",
)
args = _parse_args()
# Pin GPU selection unconditionally — --gpu-id is authoritative.
# Force PCI_BUS_ID ordering so --gpu-id matches nvidia-smi (not CUDA's
# default FASTEST_FIRST, which can swap indices on multi-GPU nodes).
if args.device == "cuda" and not args.mock:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu_id)
mock = args.mock or args.model == "mock"
device = "cpu" if mock else args.device
_backend = make_video_backend(
model_path=args.model,
mock=mock,
device=device,
gpu_id=args.gpu_id,
)
uvicorn.run(app, host=args.host, port=args.port, log_level="info")

View file

@ -0,0 +1 @@
"""Video backend registry."""

View file

@ -0,0 +1,96 @@
"""
VideoBackend Protocol backend-agnostic interface for video VLM inference.
Implementations:
MarlinBackend NemoStation/Marlin-2B (dense captioning + temporal grounding)
MockVideoBackend deterministic stub for unit tests
Both endpoints accept a video_path (local filesystem path) so the service
receives pre-staged video files rather than raw byte streams. Large uploads
should be staged by the caller before hitting /caption or /find.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol, runtime_checkable
# ── Result types ─────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class VideoEvent:
"""A single timestamped event from a caption pass."""
start: float # seconds from video start
end: float # seconds from video start
description: str
@dataclass(frozen=True)
class CaptionResult:
"""Result from a /caption call."""
scene: str # scene-level description paragraph
events: list[VideoEvent] # timestamped event list (may be empty)
caption: str # full raw caption string from the model
model: str # model name / path
@dataclass(frozen=True)
class FindResult:
"""Result from a /find call."""
span: tuple[float, float] | None # (start_sec, end_sec) or None on parse failure
format_ok: bool # True when model output matched expected format
raw: str # raw model output for debugging
model: str
# ── Backend Protocol ─────────────────────────────────────────────────────────
@runtime_checkable
class VideoBackend(Protocol):
"""Minimal interface all video backends must satisfy."""
def caption(
self,
video_path: str,
*,
max_new_tokens: int = 2048,
) -> CaptionResult: ...
def find(
self,
video_path: str,
event: str,
*,
max_new_tokens: int = 256,
) -> FindResult: ...
@property
def model_name(self) -> str: ...
@property
def vram_mb(self) -> int: ...
# ── Factory ──────────────────────────────────────────────────────────────────
def make_video_backend(
model_path: str,
*,
mock: bool = False,
device: str = "cuda",
gpu_id: int = 0,
) -> VideoBackend:
"""Instantiate the appropriate VideoBackend.
Args:
model_path: Local filesystem path to the model directory (safetensors).
mock: When True, return MockVideoBackend (no GPU required).
device: Torch device string ("cuda" or "cpu").
gpu_id: CUDA device index used only when CUDA_VISIBLE_DEVICES is
not already set externally (cf-orch sets it before spawning).
"""
if mock:
from circuitforge_core.video.backends.mock import MockVideoBackend
return MockVideoBackend(model_path)
from circuitforge_core.video.backends.marlin import MarlinBackend
return MarlinBackend(model_path=model_path, device=device)

View file

@ -0,0 +1,184 @@
"""
MarlinBackend NemoStation/Marlin-2B video VLM via HuggingFace Transformers.
Marlin-2B is a decoder-only video understanding model that produces:
- Dense scene captions with second-precise event timestamps (/caption)
- Temporal grounding of natural-language events (/find)
Requirements (install separately):
pip install "transformers>=5.7.0" "torch>=2.11.0" torchcodec "qwen-vl-utils>=0.0.14" av pillow
Security note:
trust_remote_code=True is required. The model ships a custom
AutoModelForCausalLM subclass (modeling_marlin.py). Review that file
before enabling on any node. The modeling code runs in-process with
full filesystem access.
Environment variables forwarded to the model's preprocessing layer:
FORCE_QWENVL_VIDEO_READER default: torchcodec (video decode backend)
VIDEO_MAX_PIXELS default: 200704 (max pixels per frame)
FPS default: 2.0 (frame sample rate)
FPS_MAX_FRAMES default: 240 (frame cap ~2 min video)
FPS_MIN_FRAMES default: 4 (minimum frames)
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from circuitforge_core.video.backends.base import CaptionResult, FindResult, VideoEvent
logger = logging.getLogger(__name__)
# Default env overrides so torchcodec is preferred over the slower av/ffmpeg path.
_DEFAULT_ENV: dict[str, str] = {
"FORCE_QWENVL_VIDEO_READER": "torchcodec",
}
class MarlinBackend:
"""
Load Marlin-2B once, expose caption() and find() as synchronous calls.
The model is loaded eagerly in __init__ if loading fails (OOM, missing
weights, transformers version mismatch) the error propagates immediately
rather than on first inference, so cf-orch's 2-second liveness check can
catch it.
"""
def __init__(self, model_path: str, device: str = "cuda") -> None:
self._model_path = model_path
self._device = device
# Apply env defaults before importing transformers — the model's
# custom __init__.py reads these at import time.
for key, val in _DEFAULT_ENV.items():
os.environ.setdefault(key, val)
self._model = self._load_model(model_path, device)
self._vram_mb = self._estimate_vram_mb()
logger.info(
"MarlinBackend: loaded %r on %s (~%d MB VRAM)",
model_path, device, self._vram_mb,
)
# ── Loading ──────────────────────────────────────────────────────────────
def _load_model(self, model_path: str, device: str):
import torch
from transformers import AutoModelForCausalLM
# Verify weights exist before handing to transformers — gives a clear
# error instead of a cryptic trust_remote_code failure.
path = Path(model_path)
if not path.exists():
raise FileNotFoundError(
f"Marlin model directory not found: {model_path!r}. "
"Download via Avocet or: "
f"huggingface-cli download NemoStation/Marlin-2B --local-dir {model_path}"
)
logger.info("MarlinBackend: loading model from %r ...", model_path)
model = AutoModelForCausalLM.from_pretrained(
model_path,
trust_remote_code=True, # Required — custom modeling code in repo
torch_dtype=torch.bfloat16,
device_map={"": device},
)
model.eval()
logger.info("MarlinBackend: model loaded")
return model
def _estimate_vram_mb(self) -> int:
"""Read allocated VRAM from torch after load; fall back to catalog estimate."""
try:
import torch
if torch.cuda.is_available():
return int(torch.cuda.memory_allocated() / 1024 / 1024)
except Exception:
pass
return 4500 # Catalog estimate for Marlin-2B BF16
# ── Inference ────────────────────────────────────────────────────────────
def caption(
self,
video_path: str,
*,
max_new_tokens: int = 2048,
) -> CaptionResult:
"""Produce a dense caption with scene description and timestamped events."""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path!r}")
raw_result: dict = self._model.caption(
video_path,
max_new_tokens=max_new_tokens,
do_sample=False,
)
events = [
VideoEvent(
start=float(ev["start"]),
end=float(ev["end"]),
description=str(ev["description"]),
)
for ev in raw_result.get("events", [])
]
return CaptionResult(
scene=str(raw_result.get("scene", "")),
events=events,
caption=str(raw_result.get("caption", "")),
model=self.model_name,
)
def find(
self,
video_path: str,
event: str,
*,
max_new_tokens: int = 256,
) -> FindResult:
"""Ground a natural-language event query to a video time span."""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path!r}")
if not event.strip():
raise ValueError("event query must not be empty")
raw_result: dict = self._model.find(
video_path,
event=event,
max_new_tokens=max_new_tokens,
do_sample=False,
)
# Marlin returns span as a (start, end) tuple or None.
raw_span = raw_result.get("span")
span: tuple[float, float] | None = None
if raw_span is not None:
try:
span = (float(raw_span[0]), float(raw_span[1]))
except (TypeError, IndexError, ValueError):
logger.warning(
"MarlinBackend.find: could not parse span %r for event %r",
raw_span, event,
)
return FindResult(
span=span,
format_ok=bool(raw_result.get("format_ok", False)),
raw=str(raw_result.get("raw", "")),
model=self.model_name,
)
# ── Properties ───────────────────────────────────────────────────────────
@property
def model_name(self) -> str:
return self._model_path
@property
def vram_mb(self) -> int:
return self._vram_mb

View file

@ -0,0 +1,68 @@
"""
MockVideoBackend deterministic stub for unit tests and CI.
Returns fixed CaptionResult / FindResult without any model or video I/O.
"""
from __future__ import annotations
import os
from circuitforge_core.video.backends.base import (
CaptionResult,
FindResult,
VideoEvent,
)
_MOCK_SCENE = "A mock scene with placeholder content."
_MOCK_EVENTS = [
VideoEvent(start=0.0, end=3.0, description="Mock event one"),
VideoEvent(start=3.5, end=7.2, description="Mock event two"),
]
_MOCK_CAPTION = "Scene: A mock scene with placeholder content. Events: [0.0-3.0] Mock event one. [3.5-7.2] Mock event two."
_MOCK_FIND_SPAN = (3.5, 7.2)
class MockVideoBackend:
"""No-GPU stub. Safe for import on any machine."""
def __init__(self, model_path: str = "mock") -> None:
self._model_path = model_path
def caption(
self,
video_path: str,
*,
max_new_tokens: int = 2048,
) -> CaptionResult:
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video not found: {video_path!r}")
return CaptionResult(
scene=_MOCK_SCENE,
events=list(_MOCK_EVENTS),
caption=_MOCK_CAPTION,
model=self.model_name,
)
def find(
self,
video_path: str,
event: str,
*,
max_new_tokens: int = 256,
) -> FindResult:
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video not found: {video_path!r}")
return FindResult(
span=_MOCK_FIND_SPAN,
format_ok=True,
raw="From 3.5 to 7.2.",
model=self.model_name,
)
@property
def model_name(self) -> str:
return self._model_path
@property
def vram_mb(self) -> int:
return 0

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "circuitforge-core"
version = "0.20.0"
version = "0.21.0"
description = "Shared scaffold for CircuitForge products (MIT)"
requires-python = ">=3.11"
dependencies = [
@ -66,6 +66,21 @@ musicgen-service = [
"uvicorn[standard]>=0.29",
"python-multipart>=0.0.9",
]
video-marlin = [
"torch>=2.11",
"transformers>=5.7.0",
"torchvision", # On CUDA 13 nodes install from PyTorch nightly: pip install --index-url https://download.pytorch.org/whl/nightly/cu130 torch torchvision
"torchcodec",
"qwen-vl-utils>=0.0.14",
"av",
"Pillow>=10.0",
"accelerate>=0.27",
]
video-service = [
"circuitforge-core[video-marlin]",
"fastapi>=0.110",
"uvicorn[standard]>=0.29",
]
vision-siglip = [
"torch>=2.0",
"transformers>=4.40",
@ -115,6 +130,18 @@ pdf = [
vector = [
"sqlite-vec>=0.1",
]
mqtt = [
"aiomqtt>=2.0",
]
meshtastic-serial = [
"meshtastic>=2.5",
"pypubsub>=4.0",
]
meshtastic-service = [
"circuitforge-core[mqtt,meshtastic-serial]",
"fastapi>=0.110",
"uvicorn[standard]>=0.29",
]
dev = [
"circuitforge-core[manage]",
"pytest>=8.0",

View file

View file

@ -0,0 +1,236 @@
"""
Tests for the cf-video FastAPI app using mock backend.
Tests run without GPU, torch, or a real video file.
MockVideoBackend checks os.path.exists() but never reads video content,
so a zero-byte placeholder is sufficient.
"""
from __future__ import annotations
import pytest
from fastapi.testclient import TestClient
import circuitforge_core.video.app as video_app
from circuitforge_core.video.backends.mock import MockVideoBackend
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def inject_mock_backend():
"""Replace global backend with mock before each test; restore after."""
original = video_app._backend
video_app._backend = MockVideoBackend()
yield
video_app._backend = original
@pytest.fixture()
def client():
return TestClient(video_app.app)
@pytest.fixture()
def video_file(tmp_path):
"""Placeholder file that satisfies os.path.exists() inside the mock."""
p = tmp_path / "sample.mp4"
p.write_bytes(b"\x00" * 16)
return str(p)
# ── /health ───────────────────────────────────────────────────────────────────
def test_health_returns_ok(client):
resp = client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["model"] == "mock"
assert data["vram_mb"] == 0
def test_health_503_when_no_backend(client):
video_app._backend = None
resp = client.get("/health")
assert resp.status_code == 503
# ── /caption ──────────────────────────────────────────────────────────────────
def test_caption_returns_200(client, video_file):
resp = client.post("/caption", json={"video_path": video_file})
assert resp.status_code == 200
def test_caption_response_has_scene(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
assert isinstance(data["scene"], str)
assert data["scene"]
def test_caption_response_has_events(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
assert isinstance(data["events"], list)
assert len(data["events"]) >= 1
def test_caption_events_have_timestamps(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
for ev in data["events"]:
assert "start" in ev
assert "end" in ev
assert "description" in ev
assert ev["start"] <= ev["end"]
def test_caption_response_has_caption(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
assert isinstance(data["caption"], str)
assert data["caption"]
def test_caption_response_model_field(client, video_file):
data = client.post("/caption", json={"video_path": video_file}).json()
assert isinstance(data["model"], str)
def test_caption_404_on_missing_file(client):
resp = client.post("/caption", json={"video_path": "/no/such/file.mp4"})
assert resp.status_code == 404
def test_caption_503_when_no_backend(client, video_file):
video_app._backend = None
resp = client.post("/caption", json={"video_path": video_file})
assert resp.status_code == 503
def test_caption_custom_max_new_tokens(client, video_file):
resp = client.post(
"/caption",
json={"video_path": video_file, "max_new_tokens": 512},
)
assert resp.status_code == 200
def test_caption_rejects_max_new_tokens_below_min(client, video_file):
resp = client.post(
"/caption",
json={"video_path": video_file, "max_new_tokens": 10},
)
assert resp.status_code == 422
def test_caption_rejects_max_new_tokens_above_max(client, video_file):
resp = client.post(
"/caption",
json={"video_path": video_file, "max_new_tokens": 99999},
)
assert resp.status_code == 422
# ── /find ─────────────────────────────────────────────────────────────────────
def test_find_returns_200(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": "someone waves"},
)
assert resp.status_code == 200
def test_find_response_has_span(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
# MockVideoBackend always returns a non-null span
assert data["span"] is not None
assert len(data["span"]) == 2
assert data["span"][0] <= data["span"][1]
def test_find_span_is_list_of_floats(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
span = data["span"]
assert all(isinstance(v, float) for v in span)
def test_find_format_ok_field(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
assert data["format_ok"] is True
def test_find_raw_field(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
assert isinstance(data["raw"], str)
def test_find_model_field(client, video_file):
data = client.post(
"/find",
json={"video_path": video_file, "event": "mock event"},
).json()
assert isinstance(data["model"], str)
def test_find_404_on_missing_file(client):
resp = client.post(
"/find",
json={"video_path": "/no/such/file.mp4", "event": "wave"},
)
assert resp.status_code == 404
def test_find_503_when_no_backend(client, video_file):
video_app._backend = None
resp = client.post(
"/find",
json={"video_path": video_file, "event": "wave"},
)
assert resp.status_code == 503
def test_find_rejects_empty_event(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": ""},
)
assert resp.status_code == 422
def test_find_custom_max_new_tokens(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": "wave", "max_new_tokens": 128},
)
assert resp.status_code == 200
def test_find_rejects_max_new_tokens_below_min(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": "wave", "max_new_tokens": 10},
)
assert resp.status_code == 422
def test_find_rejects_max_new_tokens_above_max(client, video_file):
resp = client.post(
"/find",
json={"video_path": video_file, "event": "wave", "max_new_tokens": 99999},
)
assert resp.status_code == 422

View file

@ -0,0 +1,157 @@
"""
Tests for MockVideoBackend and the VideoBackend protocol.
All tests run without a GPU, torch install, or any real video file
(MockVideoBackend only checks os.path.exists, not video validity).
"""
from __future__ import annotations
import os
import tempfile
import pytest
from circuitforge_core.video.backends.base import (
CaptionResult,
FindResult,
VideoBackend,
VideoEvent,
make_video_backend,
)
from circuitforge_core.video.backends.mock import MockVideoBackend
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture()
def video_file(tmp_path):
"""Create a temporary file that satisfies os.path.exists() checks."""
p = tmp_path / "test.mp4"
p.write_bytes(b"\x00" * 16) # placeholder bytes; mock never reads content
return str(p)
# ── Protocol conformance ──────────────────────────────────────────────────────
def test_mock_satisfies_protocol():
backend = MockVideoBackend()
assert isinstance(backend, VideoBackend)
def test_mock_model_name_default():
assert MockVideoBackend().model_name == "mock"
def test_mock_model_name_custom():
assert MockVideoBackend(model_path="custom-path").model_name == "custom-path"
def test_mock_vram_mb():
assert MockVideoBackend().vram_mb == 0
# ── caption() ─────────────────────────────────────────────────────────────────
def test_caption_returns_caption_result(video_file):
result = MockVideoBackend().caption(video_file)
assert isinstance(result, CaptionResult)
def test_caption_scene_is_str(video_file):
result = MockVideoBackend().caption(video_file)
assert isinstance(result.scene, str)
assert result.scene # non-empty
def test_caption_events_are_video_events(video_file):
result = MockVideoBackend().caption(video_file)
assert isinstance(result.events, list)
for ev in result.events:
assert isinstance(ev, VideoEvent)
def test_caption_events_have_numeric_timestamps(video_file):
result = MockVideoBackend().caption(video_file)
for ev in result.events:
assert isinstance(ev.start, float)
assert isinstance(ev.end, float)
assert ev.start <= ev.end
def test_caption_caption_str(video_file):
result = MockVideoBackend().caption(video_file)
assert isinstance(result.caption, str)
assert result.caption
def test_caption_model_matches_path(video_file):
result = MockVideoBackend(model_path="test-model").caption(video_file)
assert result.model == "test-model"
def test_caption_raises_on_missing_file():
with pytest.raises(FileNotFoundError):
MockVideoBackend().caption("/nonexistent/video.mp4")
def test_caption_max_new_tokens_accepted(video_file):
"""max_new_tokens kwarg must be accepted without error."""
result = MockVideoBackend().caption(video_file, max_new_tokens=512)
assert isinstance(result, CaptionResult)
# ── find() ────────────────────────────────────────────────────────────────────
def test_find_returns_find_result(video_file):
result = MockVideoBackend().find(video_file, "someone waves")
assert isinstance(result, FindResult)
def test_find_span_is_tuple_or_none(video_file):
result = MockVideoBackend().find(video_file, "mock event")
# MockVideoBackend always returns a span
assert result.span is not None
assert len(result.span) == 2
assert result.span[0] <= result.span[1]
def test_find_format_ok_true(video_file):
result = MockVideoBackend().find(video_file, "mock event")
assert result.format_ok is True
def test_find_raw_is_str(video_file):
result = MockVideoBackend().find(video_file, "mock event")
assert isinstance(result.raw, str)
def test_find_model_matches_path(video_file):
result = MockVideoBackend(model_path="my-model").find(video_file, "event")
assert result.model == "my-model"
def test_find_raises_on_missing_file():
with pytest.raises(FileNotFoundError):
MockVideoBackend().find("/nonexistent/video.mp4", "event")
def test_find_max_new_tokens_accepted(video_file):
result = MockVideoBackend().find(video_file, "event", max_new_tokens=128)
assert isinstance(result, FindResult)
# ── make_video_backend factory ────────────────────────────────────────────────
def test_factory_returns_mock_when_flag_set():
backend = make_video_backend(model_path="mock", mock=True)
assert isinstance(backend, MockVideoBackend)
def test_factory_mock_uses_model_path():
backend = make_video_backend(model_path="some-path", mock=True)
assert backend.model_name == "some-path"