diff --git a/circuitforge_core/resources/__init__.py b/circuitforge_core/resources/__init__.py index e69de29..8bf5235 100644 --- a/circuitforge_core/resources/__init__.py +++ b/circuitforge_core/resources/__init__.py @@ -0,0 +1 @@ +from circuitforge_core.resources.client import CFOrchClient, Allocation # noqa: F401 diff --git a/circuitforge_core/resources/client.py b/circuitforge_core/resources/client.py new file mode 100644 index 0000000..fa0a72e --- /dev/null +++ b/circuitforge_core/resources/client.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import logging +from contextlib import contextmanager, asynccontextmanager +from dataclasses import dataclass + +import httpx + +logger = logging.getLogger(__name__) + + +@dataclass +class Allocation: + allocation_id: str + service: str + node_id: str + gpu_id: int + model: str | None + url: str + started: bool + warm: bool + + +class CFOrchClient: + """ + Client for cf-orch coordinator allocation. + + Sync usage (in LLMRouter or other sync code): + client = CFOrchClient(os.environ["CF_ORCH_URL"]) + with client.allocate("vllm", model_candidates=["Ouro-1.4B"]) as alloc: + # alloc.url is the inference endpoint + + Async usage (in FastAPI apps): + async with client.allocate_async("vllm", model_candidates=["Ouro-1.4B"]) as alloc: + ... + + Raises ValueError immediately if coordinator_url is empty. + """ + + def __init__(self, coordinator_url: str) -> None: + if not coordinator_url: + raise ValueError("coordinator_url is empty — cf-orch not configured") + self._url = coordinator_url.rstrip("/") + + def _build_body(self, model_candidates: list[str] | None, ttl_s: float, caller: str) -> dict: + return { + "model_candidates": model_candidates or [], + "ttl_s": ttl_s, + "caller": caller, + } + + def _parse_allocation(self, data: dict, service: str) -> Allocation: + return Allocation( + allocation_id=data["allocation_id"], + service=service, + node_id=data["node_id"], + gpu_id=data["gpu_id"], + model=data.get("model"), + url=data["url"], + started=data.get("started", False), + warm=data.get("warm", False), + ) + + @contextmanager + def allocate( + self, + service: str, + *, + model_candidates: list[str] | None = None, + ttl_s: float = 3600.0, + caller: str = "", + ): + """Sync context manager. Allocates on enter, releases on exit.""" + resp = httpx.post( + f"{self._url}/api/services/{service}/allocate", + json=self._build_body(model_candidates, ttl_s, caller), + timeout=120.0, + ) + if not resp.is_success: + raise RuntimeError( + f"cf-orch allocation failed for {service!r}: " + f"HTTP {resp.status_code} — {resp.text[:200]}" + ) + alloc = self._parse_allocation(resp.json(), service) + try: + yield alloc + finally: + try: + httpx.delete( + f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}", + timeout=10.0, + ) + except Exception as exc: + logger.debug("cf-orch release failed (non-fatal): %s", exc) + + @asynccontextmanager + async def allocate_async( + self, + service: str, + *, + model_candidates: list[str] | None = None, + ttl_s: float = 3600.0, + caller: str = "", + ): + """Async context manager. Allocates on enter, releases on exit.""" + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + f"{self._url}/api/services/{service}/allocate", + json=self._build_body(model_candidates, ttl_s, caller), + ) + if not resp.is_success: + raise RuntimeError( + f"cf-orch allocation failed for {service!r}: " + f"HTTP {resp.status_code} — {resp.text[:200]}" + ) + alloc = self._parse_allocation(resp.json(), service) + try: + yield alloc + finally: + try: + await client.delete( + f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}", + timeout=10.0, + ) + except Exception as exc: + logger.debug("cf-orch async release failed (non-fatal): %s", exc) diff --git a/tests/test_resources/test_client.py b/tests/test_resources/test_client.py new file mode 100644 index 0000000..288fb64 --- /dev/null +++ b/tests/test_resources/test_client.py @@ -0,0 +1,94 @@ +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import httpretty +from circuitforge_core.resources.client import CFOrchClient, Allocation + +_ALLOC_BODY = ( + '{"allocation_id":"abc123","service":"vllm","node_id":"heimdall",' + '"gpu_id":0,"model":"Ouro-1.4B","url":"http://heimdall:8000","started":false,"warm":true}' +) + + +@httpretty.activate +def test_sync_allocate_returns_allocation(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + body=_ALLOC_BODY, content_type="application/json", + ) + httpretty.register_uri( + httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/abc123", + body='{"released":true}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with client.allocate("vllm", model_candidates=["Ouro-1.4B"], caller="test") as alloc: + assert isinstance(alloc, Allocation) + assert alloc.url == "http://heimdall:8000" + assert alloc.model == "Ouro-1.4B" + assert alloc.allocation_id == "abc123" + assert httpretty.last_request().method == "DELETE" + + +@httpretty.activate +def test_sync_allocate_ignores_404_on_release(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + body='{"allocation_id":"xyz","service":"vllm","node_id":"a","gpu_id":0,' + '"model":"m","url":"http://a:8000","started":false,"warm":false}', + content_type="application/json", + ) + httpretty.register_uri( + httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/xyz", + status=404, body='{"detail":"not found"}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with client.allocate("vllm", model_candidates=["m"]) as alloc: + assert alloc.url == "http://a:8000" + # No exception raised — 404 on release is silently ignored + + +@httpretty.activate +def test_sync_allocate_raises_on_503(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + status=503, body='{"detail":"no capacity"}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with pytest.raises(RuntimeError, match="cf-orch allocation failed"): + with client.allocate("vllm", model_candidates=["m"]): + pass + + +async def test_async_allocate_works(): + # httpretty only patches stdlib sockets; httpx async uses anyio sockets so + # we mock httpx.AsyncClient directly instead. + alloc_data = { + "allocation_id": "a1", "service": "vllm", "node_id": "n", + "gpu_id": 0, "model": "m", "url": "http://n:8000", + "started": False, "warm": False, + } + release_data = {"released": True} + + def _make_response(data, status_code=200): + resp = MagicMock() + resp.is_success = status_code < 400 + resp.status_code = status_code + resp.json.return_value = data + return resp + + mock_post = AsyncMock(return_value=_make_response(alloc_data)) + mock_delete = AsyncMock(return_value=_make_response(release_data)) + + mock_async_client = MagicMock() + mock_async_client.post = mock_post + mock_async_client.delete = mock_delete + mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) + mock_async_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_async_client): + client = CFOrchClient("http://orch:7700") + async with client.allocate_async("vllm", model_candidates=["m"]) as alloc: + assert alloc.url == "http://n:8000" + assert alloc.allocation_id == "a1" + mock_delete.assert_called_once()