diff --git a/circuitforge_core/video/__init__.py b/circuitforge_core/video/__init__.py new file mode 100644 index 0000000..627b0f0 --- /dev/null +++ b/circuitforge_core/video/__init__.py @@ -0,0 +1,11 @@ +""" +circuitforge_core.video — cf-video service: video VLM inference via Marlin-2B. + +Exposes a FastAPI process (managed by cf-orch) with endpoints: + GET /health → {"status": "ok", "model": str, "vram_mb": int} + POST /caption → CaptionResult (scene description + timestamped events) + POST /find → FindResult (temporal grounding span for a natural-language event) + +Run as: + python -m circuitforge_core.video.app --model /path/to/NemoStation--Marlin-2B --port 8016 --gpu-id 0 +""" diff --git a/circuitforge_core/video/app.py b/circuitforge_core/video/app.py new file mode 100644 index 0000000..00a3ee3 --- /dev/null +++ b/circuitforge_core/video/app.py @@ -0,0 +1,189 @@ +""" +cf-video FastAPI service — managed by cf-orch. + +Endpoints: + GET /health → {"status": "ok", "model": str, "vram_mb": int} + POST /caption → CaptionResponse (scene + timestamped events) + POST /find → FindResponse (temporal grounding span) + +Usage: + python -m circuitforge_core.video.app \ + --model /Library/Assets/LLM/cf-video/models/NemoStation--Marlin-2B \ + --port 8016 \ + --gpu-id 0 + +The service loads the model once at startup and blocks until it is ready. +cf-orch health-polls /health before routing any inference requests. + +Model requirements: + transformers >= 5.7.0 + torch >= 2.11.0 + torchcodec (installed) + qwen-vl-utils >= 0.0.14 (installed) + +Security: + Marlin requires trust_remote_code=True. Review the model's + modeling_marlin.py before deploying on a production node. +""" +from __future__ import annotations + +import argparse +import logging +import os +from typing import Any + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, Field + +from circuitforge_core.video.backends.base import VideoBackend, make_video_backend + +app = FastAPI(title="cf-video", version="0.1.0") +_backend: VideoBackend | None = None + + +# ── Request / response models ───────────────────────────────────────────────── + +class CaptionRequest(BaseModel): + video_path: str = Field(..., description="Absolute path to the video file on this node") + max_new_tokens: int = Field(2048, ge=64, le=8192) + + +class VideoEventOut(BaseModel): + start: float + end: float + description: str + + +class CaptionResponse(BaseModel): + scene: str + events: list[VideoEventOut] + caption: str + model: str + + +class FindRequest(BaseModel): + video_path: str = Field(..., description="Absolute path to the video file on this node") + event: str = Field(..., min_length=1, description="Natural-language event description to locate") + max_new_tokens: int = Field(256, ge=32, le=2048) + + +class FindResponse(BaseModel): + span: list[float] | None = Field( + None, + description="[start_sec, end_sec] or null when the model could not ground the event", + ) + format_ok: bool + raw: str + model: str + + +# ── Endpoints ───────────────────────────────────────────────────────────────── + +@app.get("/health") +def health() -> dict[str, Any]: + if _backend is None: + raise HTTPException(503, detail="backend not initialised") + return { + "status": "ok", + "model": _backend.model_name, + "vram_mb": _backend.vram_mb, + } + + +@app.post("/caption", response_model=CaptionResponse) +def caption(req: CaptionRequest) -> CaptionResponse: + if _backend is None: + raise HTTPException(503, detail="backend not initialised") + try: + result = _backend.caption(req.video_path, max_new_tokens=req.max_new_tokens) + except FileNotFoundError as exc: + raise HTTPException(404, detail=str(exc)) from exc + except Exception as exc: + logging.exception("caption failed for %r", req.video_path) + raise HTTPException(500, detail=str(exc)) from exc + + return CaptionResponse( + scene=result.scene, + events=[ + VideoEventOut(start=ev.start, end=ev.end, description=ev.description) + for ev in result.events + ], + caption=result.caption, + model=result.model, + ) + + +@app.post("/find", response_model=FindResponse) +def find(req: FindRequest) -> FindResponse: + if _backend is None: + raise HTTPException(503, detail="backend not initialised") + try: + result = _backend.find( + req.video_path, + req.event, + max_new_tokens=req.max_new_tokens, + ) + except FileNotFoundError as exc: + raise HTTPException(404, detail=str(exc)) from exc + except ValueError as exc: + raise HTTPException(422, detail=str(exc)) from exc + except Exception as exc: + logging.exception("find failed for %r event=%r", req.video_path, req.event) + raise HTTPException(500, detail=str(exc)) from exc + + return FindResponse( + span=list(result.span) if result.span is not None else None, + format_ok=result.format_ok, + raw=result.raw, + model=result.model, + ) + + +# ── CLI entry point ─────────────────────────────────────────────────────────── + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="cf-video service (Marlin-2B)") + p.add_argument( + "--model", + required=True, + help="Local filesystem path to the Marlin model directory (safetensors)", + ) + p.add_argument("--port", type=int, default=8016) + p.add_argument("--host", default="0.0.0.0") + p.add_argument( + "--gpu-id", type=int, default=0, + help="CUDA device index; overridden by CUDA_VISIBLE_DEVICES when set by cf-orch", + ) + p.add_argument("--device", default="cuda", choices=["cuda", "cpu"]) + p.add_argument( + "--mock", action="store_true", + help="Run with MockVideoBackend (no GPU, for testing)", + ) + return p.parse_args() + + +if __name__ == "__main__": + import uvicorn + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s — %(message)s", + ) + args = _parse_args() + + # cf-orch sets CUDA_VISIBLE_DEVICES before spawning; only set it here when + # running the service manually (--gpu-id flag) without cf-orch. + if args.device == "cuda" and not args.mock: + os.environ.setdefault("CUDA_VISIBLE_DEVICES", str(args.gpu_id)) + + mock = args.mock or args.model == "mock" + device = "cpu" if mock else args.device + + _backend = make_video_backend( + model_path=args.model, + mock=mock, + device=device, + gpu_id=args.gpu_id, + ) + + uvicorn.run(app, host=args.host, port=args.port, log_level="info") diff --git a/circuitforge_core/video/backends/__init__.py b/circuitforge_core/video/backends/__init__.py new file mode 100644 index 0000000..df20cc6 --- /dev/null +++ b/circuitforge_core/video/backends/__init__.py @@ -0,0 +1 @@ +"""Video backend registry.""" diff --git a/circuitforge_core/video/backends/base.py b/circuitforge_core/video/backends/base.py new file mode 100644 index 0000000..cead4c3 --- /dev/null +++ b/circuitforge_core/video/backends/base.py @@ -0,0 +1,96 @@ +""" +VideoBackend Protocol — backend-agnostic interface for video VLM inference. + +Implementations: + MarlinBackend — NemoStation/Marlin-2B (dense captioning + temporal grounding) + MockVideoBackend — deterministic stub for unit tests + +Both endpoints accept a video_path (local filesystem path) so the service +receives pre-staged video files rather than raw byte streams. Large uploads +should be staged by the caller before hitting /caption or /find. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Protocol, runtime_checkable + + +# ── Result types ───────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class VideoEvent: + """A single timestamped event from a caption pass.""" + start: float # seconds from video start + end: float # seconds from video start + description: str + + +@dataclass(frozen=True) +class CaptionResult: + """Result from a /caption call.""" + scene: str # scene-level description paragraph + events: list[VideoEvent] # timestamped event list (may be empty) + caption: str # full raw caption string from the model + model: str # model name / path + + +@dataclass(frozen=True) +class FindResult: + """Result from a /find call.""" + span: tuple[float, float] | None # (start_sec, end_sec) or None on parse failure + format_ok: bool # True when model output matched expected format + raw: str # raw model output for debugging + model: str + + +# ── Backend Protocol ───────────────────────────────────────────────────────── + +@runtime_checkable +class VideoBackend(Protocol): + """Minimal interface all video backends must satisfy.""" + + def caption( + self, + video_path: str, + *, + max_new_tokens: int = 2048, + ) -> CaptionResult: ... + + def find( + self, + video_path: str, + event: str, + *, + max_new_tokens: int = 256, + ) -> FindResult: ... + + @property + def model_name(self) -> str: ... + + @property + def vram_mb(self) -> int: ... + + +# ── Factory ────────────────────────────────────────────────────────────────── + +def make_video_backend( + model_path: str, + *, + mock: bool = False, + device: str = "cuda", + gpu_id: int = 0, +) -> VideoBackend: + """Instantiate the appropriate VideoBackend. + + Args: + model_path: Local filesystem path to the model directory (safetensors). + mock: When True, return MockVideoBackend (no GPU required). + device: Torch device string ("cuda" or "cpu"). + gpu_id: CUDA device index — used only when CUDA_VISIBLE_DEVICES is + not already set externally (cf-orch sets it before spawning). + """ + if mock: + from circuitforge_core.video.backends.mock import MockVideoBackend + return MockVideoBackend(model_path) + from circuitforge_core.video.backends.marlin import MarlinBackend + return MarlinBackend(model_path=model_path, device=device) diff --git a/circuitforge_core/video/backends/marlin.py b/circuitforge_core/video/backends/marlin.py new file mode 100644 index 0000000..81b4b8f --- /dev/null +++ b/circuitforge_core/video/backends/marlin.py @@ -0,0 +1,184 @@ +""" +MarlinBackend — NemoStation/Marlin-2B video VLM via HuggingFace Transformers. + +Marlin-2B is a decoder-only video understanding model that produces: + - Dense scene captions with second-precise event timestamps (/caption) + - Temporal grounding of natural-language events (/find) + +Requirements (install separately): + pip install "transformers>=5.7.0" "torch>=2.11.0" torchcodec "qwen-vl-utils>=0.0.14" av pillow + +Security note: + trust_remote_code=True is required. The model ships a custom + AutoModelForCausalLM subclass (modeling_marlin.py). Review that file + before enabling on any node. The modeling code runs in-process with + full filesystem access. + +Environment variables forwarded to the model's preprocessing layer: + FORCE_QWENVL_VIDEO_READER default: torchcodec (video decode backend) + VIDEO_MAX_PIXELS default: 200704 (max pixels per frame) + FPS default: 2.0 (frame sample rate) + FPS_MAX_FRAMES default: 240 (frame cap ~2 min video) + FPS_MIN_FRAMES default: 4 (minimum frames) +""" +from __future__ import annotations + +import logging +import os +from pathlib import Path + +from circuitforge_core.video.backends.base import CaptionResult, FindResult, VideoEvent + +logger = logging.getLogger(__name__) + +# Default env overrides so torchcodec is preferred over the slower av/ffmpeg path. +_DEFAULT_ENV: dict[str, str] = { + "FORCE_QWENVL_VIDEO_READER": "torchcodec", +} + + +class MarlinBackend: + """ + Load Marlin-2B once, expose caption() and find() as synchronous calls. + + The model is loaded eagerly in __init__ — if loading fails (OOM, missing + weights, transformers version mismatch) the error propagates immediately + rather than on first inference, so cf-orch's 2-second liveness check can + catch it. + """ + + def __init__(self, model_path: str, device: str = "cuda") -> None: + self._model_path = model_path + self._device = device + + # Apply env defaults before importing transformers — the model's + # custom __init__.py reads these at import time. + for key, val in _DEFAULT_ENV.items(): + os.environ.setdefault(key, val) + + self._model = self._load_model(model_path, device) + self._vram_mb = self._estimate_vram_mb() + logger.info( + "MarlinBackend: loaded %r on %s (~%d MB VRAM)", + model_path, device, self._vram_mb, + ) + + # ── Loading ────────────────────────────────────────────────────────────── + + def _load_model(self, model_path: str, device: str): + import torch + from transformers import AutoModelForCausalLM + + # Verify weights exist before handing to transformers — gives a clear + # error instead of a cryptic trust_remote_code failure. + path = Path(model_path) + if not path.exists(): + raise FileNotFoundError( + f"Marlin model directory not found: {model_path!r}. " + "Download via Avocet or: " + f"huggingface-cli download NemoStation/Marlin-2B --local-dir {model_path}" + ) + + logger.info("MarlinBackend: loading model from %r ...", model_path) + model = AutoModelForCausalLM.from_pretrained( + model_path, + trust_remote_code=True, # Required — custom modeling code in repo + torch_dtype=torch.bfloat16, + device_map={"": device}, + ) + model.eval() + logger.info("MarlinBackend: model loaded") + return model + + def _estimate_vram_mb(self) -> int: + """Read allocated VRAM from torch after load; fall back to catalog estimate.""" + try: + import torch + if torch.cuda.is_available(): + return int(torch.cuda.memory_allocated() / 1024 / 1024) + except Exception: + pass + return 4500 # Catalog estimate for Marlin-2B BF16 + + # ── Inference ──────────────────────────────────────────────────────────── + + def caption( + self, + video_path: str, + *, + max_new_tokens: int = 2048, + ) -> CaptionResult: + """Produce a dense caption with scene description and timestamped events.""" + if not os.path.exists(video_path): + raise FileNotFoundError(f"Video file not found: {video_path!r}") + + raw_result: dict = self._model.caption( + video_path, + max_new_tokens=max_new_tokens, + do_sample=False, + ) + + events = [ + VideoEvent( + start=float(ev["start"]), + end=float(ev["end"]), + description=str(ev["description"]), + ) + for ev in raw_result.get("events", []) + ] + + return CaptionResult( + scene=str(raw_result.get("scene", "")), + events=events, + caption=str(raw_result.get("caption", "")), + model=self.model_name, + ) + + def find( + self, + video_path: str, + event: str, + *, + max_new_tokens: int = 256, + ) -> FindResult: + """Ground a natural-language event query to a video time span.""" + if not os.path.exists(video_path): + raise FileNotFoundError(f"Video file not found: {video_path!r}") + if not event.strip(): + raise ValueError("event query must not be empty") + + raw_result: dict = self._model.find( + video_path, + event=event, + max_new_tokens=max_new_tokens, + do_sample=False, + ) + + # Marlin returns span as a (start, end) tuple or None. + raw_span = raw_result.get("span") + span: tuple[float, float] | None = None + if raw_span is not None: + try: + span = (float(raw_span[0]), float(raw_span[1])) + except (TypeError, IndexError, ValueError): + logger.warning( + "MarlinBackend.find: could not parse span %r for event %r", + raw_span, event, + ) + + return FindResult( + span=span, + format_ok=bool(raw_result.get("format_ok", False)), + raw=str(raw_result.get("raw", "")), + model=self.model_name, + ) + + # ── Properties ─────────────────────────────────────────────────────────── + + @property + def model_name(self) -> str: + return self._model_path + + @property + def vram_mb(self) -> int: + return self._vram_mb diff --git a/circuitforge_core/video/backends/mock.py b/circuitforge_core/video/backends/mock.py new file mode 100644 index 0000000..f8864a4 --- /dev/null +++ b/circuitforge_core/video/backends/mock.py @@ -0,0 +1,68 @@ +""" +MockVideoBackend — deterministic stub for unit tests and CI. + +Returns fixed CaptionResult / FindResult without any model or video I/O. +""" +from __future__ import annotations + +import os + +from circuitforge_core.video.backends.base import ( + CaptionResult, + FindResult, + VideoEvent, +) + +_MOCK_SCENE = "A mock scene with placeholder content." +_MOCK_EVENTS = [ + VideoEvent(start=0.0, end=3.0, description="Mock event one"), + VideoEvent(start=3.5, end=7.2, description="Mock event two"), +] +_MOCK_CAPTION = "Scene: A mock scene with placeholder content. Events: [0.0-3.0] Mock event one. [3.5-7.2] Mock event two." +_MOCK_FIND_SPAN = (3.5, 7.2) + + +class MockVideoBackend: + """No-GPU stub. Safe for import on any machine.""" + + def __init__(self, model_path: str = "mock") -> None: + self._model_path = model_path + + def caption( + self, + video_path: str, + *, + max_new_tokens: int = 2048, + ) -> CaptionResult: + if not os.path.exists(video_path): + raise FileNotFoundError(f"Video not found: {video_path!r}") + return CaptionResult( + scene=_MOCK_SCENE, + events=list(_MOCK_EVENTS), + caption=_MOCK_CAPTION, + model=self.model_name, + ) + + def find( + self, + video_path: str, + event: str, + *, + max_new_tokens: int = 256, + ) -> FindResult: + if not os.path.exists(video_path): + raise FileNotFoundError(f"Video not found: {video_path!r}") + return FindResult( + span=_MOCK_FIND_SPAN, + format_ok=True, + raw="From 3.5 to 7.2.", + model=self.model_name, + ) + + @property + def model_name(self) -> str: + return self._model_path + + @property + def vram_mb(self) -> int: + return 0 diff --git a/pyproject.toml b/pyproject.toml index 70ebb2f..84c6a66 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,20 @@ musicgen-service = [ "uvicorn[standard]>=0.29", "python-multipart>=0.0.9", ] +video-marlin = [ + "torch>=2.11", + "transformers>=5.7.0", + "torchcodec", + "qwen-vl-utils>=0.0.14", + "av", + "Pillow>=10.0", + "accelerate>=0.27", +] +video-service = [ + "circuitforge-core[video-marlin]", + "fastapi>=0.110", + "uvicorn[standard]>=0.29", +] vision-siglip = [ "torch>=2.0", "transformers>=4.40", @@ -115,6 +129,18 @@ pdf = [ vector = [ "sqlite-vec>=0.1", ] +mqtt = [ + "aiomqtt>=2.0", +] +meshtastic-serial = [ + "meshtastic>=2.5", + "pypubsub>=4.0", +] +meshtastic-service = [ + "circuitforge-core[mqtt,meshtastic-serial]", + "fastapi>=0.110", + "uvicorn[standard]>=0.29", +] dev = [ "circuitforge-core[manage]", "pytest>=8.0", diff --git a/tests/test_video/__init__.py b/tests/test_video/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_video/test_app.py b/tests/test_video/test_app.py new file mode 100644 index 0000000..53eacc7 --- /dev/null +++ b/tests/test_video/test_app.py @@ -0,0 +1,236 @@ +""" +Tests for the cf-video FastAPI app using mock backend. + +Tests run without GPU, torch, or a real video file. +MockVideoBackend checks os.path.exists() but never reads video content, +so a zero-byte placeholder is sufficient. +""" +from __future__ import annotations + +import pytest +from fastapi.testclient import TestClient + +import circuitforge_core.video.app as video_app +from circuitforge_core.video.backends.mock import MockVideoBackend + + +# ── Fixtures ────────────────────────────────────────────────────────────────── + + +@pytest.fixture(autouse=True) +def inject_mock_backend(): + """Replace global backend with mock before each test; restore after.""" + original = video_app._backend + video_app._backend = MockVideoBackend() + yield + video_app._backend = original + + +@pytest.fixture() +def client(): + return TestClient(video_app.app) + + +@pytest.fixture() +def video_file(tmp_path): + """Placeholder file that satisfies os.path.exists() inside the mock.""" + p = tmp_path / "sample.mp4" + p.write_bytes(b"\x00" * 16) + return str(p) + + +# ── /health ─────────────────────────────────────────────────────────────────── + + +def test_health_returns_ok(client): + resp = client.get("/health") + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["model"] == "mock" + assert data["vram_mb"] == 0 + + +def test_health_503_when_no_backend(client): + video_app._backend = None + resp = client.get("/health") + assert resp.status_code == 503 + + +# ── /caption ────────────────────────────────────────────────────────────────── + + +def test_caption_returns_200(client, video_file): + resp = client.post("/caption", json={"video_path": video_file}) + assert resp.status_code == 200 + + +def test_caption_response_has_scene(client, video_file): + data = client.post("/caption", json={"video_path": video_file}).json() + assert isinstance(data["scene"], str) + assert data["scene"] + + +def test_caption_response_has_events(client, video_file): + data = client.post("/caption", json={"video_path": video_file}).json() + assert isinstance(data["events"], list) + assert len(data["events"]) >= 1 + + +def test_caption_events_have_timestamps(client, video_file): + data = client.post("/caption", json={"video_path": video_file}).json() + for ev in data["events"]: + assert "start" in ev + assert "end" in ev + assert "description" in ev + assert ev["start"] <= ev["end"] + + +def test_caption_response_has_caption(client, video_file): + data = client.post("/caption", json={"video_path": video_file}).json() + assert isinstance(data["caption"], str) + assert data["caption"] + + +def test_caption_response_model_field(client, video_file): + data = client.post("/caption", json={"video_path": video_file}).json() + assert isinstance(data["model"], str) + + +def test_caption_404_on_missing_file(client): + resp = client.post("/caption", json={"video_path": "/no/such/file.mp4"}) + assert resp.status_code == 404 + + +def test_caption_503_when_no_backend(client, video_file): + video_app._backend = None + resp = client.post("/caption", json={"video_path": video_file}) + assert resp.status_code == 503 + + +def test_caption_custom_max_new_tokens(client, video_file): + resp = client.post( + "/caption", + json={"video_path": video_file, "max_new_tokens": 512}, + ) + assert resp.status_code == 200 + + +def test_caption_rejects_max_new_tokens_below_min(client, video_file): + resp = client.post( + "/caption", + json={"video_path": video_file, "max_new_tokens": 10}, + ) + assert resp.status_code == 422 + + +def test_caption_rejects_max_new_tokens_above_max(client, video_file): + resp = client.post( + "/caption", + json={"video_path": video_file, "max_new_tokens": 99999}, + ) + assert resp.status_code == 422 + + +# ── /find ───────────────────────────────────────────────────────────────────── + + +def test_find_returns_200(client, video_file): + resp = client.post( + "/find", + json={"video_path": video_file, "event": "someone waves"}, + ) + assert resp.status_code == 200 + + +def test_find_response_has_span(client, video_file): + data = client.post( + "/find", + json={"video_path": video_file, "event": "mock event"}, + ).json() + # MockVideoBackend always returns a non-null span + assert data["span"] is not None + assert len(data["span"]) == 2 + assert data["span"][0] <= data["span"][1] + + +def test_find_span_is_list_of_floats(client, video_file): + data = client.post( + "/find", + json={"video_path": video_file, "event": "mock event"}, + ).json() + span = data["span"] + assert all(isinstance(v, float) for v in span) + + +def test_find_format_ok_field(client, video_file): + data = client.post( + "/find", + json={"video_path": video_file, "event": "mock event"}, + ).json() + assert data["format_ok"] is True + + +def test_find_raw_field(client, video_file): + data = client.post( + "/find", + json={"video_path": video_file, "event": "mock event"}, + ).json() + assert isinstance(data["raw"], str) + + +def test_find_model_field(client, video_file): + data = client.post( + "/find", + json={"video_path": video_file, "event": "mock event"}, + ).json() + assert isinstance(data["model"], str) + + +def test_find_404_on_missing_file(client): + resp = client.post( + "/find", + json={"video_path": "/no/such/file.mp4", "event": "wave"}, + ) + assert resp.status_code == 404 + + +def test_find_503_when_no_backend(client, video_file): + video_app._backend = None + resp = client.post( + "/find", + json={"video_path": video_file, "event": "wave"}, + ) + assert resp.status_code == 503 + + +def test_find_rejects_empty_event(client, video_file): + resp = client.post( + "/find", + json={"video_path": video_file, "event": ""}, + ) + assert resp.status_code == 422 + + +def test_find_custom_max_new_tokens(client, video_file): + resp = client.post( + "/find", + json={"video_path": video_file, "event": "wave", "max_new_tokens": 128}, + ) + assert resp.status_code == 200 + + +def test_find_rejects_max_new_tokens_below_min(client, video_file): + resp = client.post( + "/find", + json={"video_path": video_file, "event": "wave", "max_new_tokens": 10}, + ) + assert resp.status_code == 422 + + +def test_find_rejects_max_new_tokens_above_max(client, video_file): + resp = client.post( + "/find", + json={"video_path": video_file, "event": "wave", "max_new_tokens": 99999}, + ) + assert resp.status_code == 422 diff --git a/tests/test_video/test_mock_backend.py b/tests/test_video/test_mock_backend.py new file mode 100644 index 0000000..7b26a79 --- /dev/null +++ b/tests/test_video/test_mock_backend.py @@ -0,0 +1,157 @@ +""" +Tests for MockVideoBackend and the VideoBackend protocol. + +All tests run without a GPU, torch install, or any real video file +(MockVideoBackend only checks os.path.exists, not video validity). +""" +from __future__ import annotations + +import os +import tempfile + +import pytest + +from circuitforge_core.video.backends.base import ( + CaptionResult, + FindResult, + VideoBackend, + VideoEvent, + make_video_backend, +) +from circuitforge_core.video.backends.mock import MockVideoBackend + + +# ── Fixtures ────────────────────────────────────────────────────────────────── + + +@pytest.fixture() +def video_file(tmp_path): + """Create a temporary file that satisfies os.path.exists() checks.""" + p = tmp_path / "test.mp4" + p.write_bytes(b"\x00" * 16) # placeholder bytes; mock never reads content + return str(p) + + +# ── Protocol conformance ────────────────────────────────────────────────────── + + +def test_mock_satisfies_protocol(): + backend = MockVideoBackend() + assert isinstance(backend, VideoBackend) + + +def test_mock_model_name_default(): + assert MockVideoBackend().model_name == "mock" + + +def test_mock_model_name_custom(): + assert MockVideoBackend(model_path="custom-path").model_name == "custom-path" + + +def test_mock_vram_mb(): + assert MockVideoBackend().vram_mb == 0 + + +# ── caption() ───────────────────────────────────────────────────────────────── + + +def test_caption_returns_caption_result(video_file): + result = MockVideoBackend().caption(video_file) + assert isinstance(result, CaptionResult) + + +def test_caption_scene_is_str(video_file): + result = MockVideoBackend().caption(video_file) + assert isinstance(result.scene, str) + assert result.scene # non-empty + + +def test_caption_events_are_video_events(video_file): + result = MockVideoBackend().caption(video_file) + assert isinstance(result.events, list) + for ev in result.events: + assert isinstance(ev, VideoEvent) + + +def test_caption_events_have_numeric_timestamps(video_file): + result = MockVideoBackend().caption(video_file) + for ev in result.events: + assert isinstance(ev.start, float) + assert isinstance(ev.end, float) + assert ev.start <= ev.end + + +def test_caption_caption_str(video_file): + result = MockVideoBackend().caption(video_file) + assert isinstance(result.caption, str) + assert result.caption + + +def test_caption_model_matches_path(video_file): + result = MockVideoBackend(model_path="test-model").caption(video_file) + assert result.model == "test-model" + + +def test_caption_raises_on_missing_file(): + with pytest.raises(FileNotFoundError): + MockVideoBackend().caption("/nonexistent/video.mp4") + + +def test_caption_max_new_tokens_accepted(video_file): + """max_new_tokens kwarg must be accepted without error.""" + result = MockVideoBackend().caption(video_file, max_new_tokens=512) + assert isinstance(result, CaptionResult) + + +# ── find() ──────────────────────────────────────────────────────────────────── + + +def test_find_returns_find_result(video_file): + result = MockVideoBackend().find(video_file, "someone waves") + assert isinstance(result, FindResult) + + +def test_find_span_is_tuple_or_none(video_file): + result = MockVideoBackend().find(video_file, "mock event") + # MockVideoBackend always returns a span + assert result.span is not None + assert len(result.span) == 2 + assert result.span[0] <= result.span[1] + + +def test_find_format_ok_true(video_file): + result = MockVideoBackend().find(video_file, "mock event") + assert result.format_ok is True + + +def test_find_raw_is_str(video_file): + result = MockVideoBackend().find(video_file, "mock event") + assert isinstance(result.raw, str) + + +def test_find_model_matches_path(video_file): + result = MockVideoBackend(model_path="my-model").find(video_file, "event") + assert result.model == "my-model" + + +def test_find_raises_on_missing_file(): + with pytest.raises(FileNotFoundError): + MockVideoBackend().find("/nonexistent/video.mp4", "event") + + +def test_find_max_new_tokens_accepted(video_file): + result = MockVideoBackend().find(video_file, "event", max_new_tokens=128) + assert isinstance(result, FindResult) + + +# ── make_video_backend factory ──────────────────────────────────────────────── + + +def test_factory_returns_mock_when_flag_set(): + backend = make_video_backend(model_path="mock", mock=True) + assert isinstance(backend, MockVideoBackend) + + +def test_factory_mock_uses_model_path(): + backend = make_video_backend(model_path="some-path", mock=True) + assert backend.model_name == "some-path"