feat(core): add CFOrchClient sync+async context manager
Implements CFOrchClient with allocate() (sync contextmanager) and allocate_async() (async contextmanager) for cf-orch GPU resource allocation. Releases allocation on exit; ignores 404 on release; raises RuntimeError on non-2xx allocation response. Exports CFOrchClient and Allocation from circuitforge_core.resources. Note: async test uses unittest.mock rather than httpretty — httpretty only patches stdlib sockets and does not intercept httpx async (anyio) transport.
This commit is contained in:
parent
8201f6b3e9
commit
defaf39883
3 changed files with 221 additions and 0 deletions
|
|
@ -0,0 +1 @@
|
|||
from circuitforge_core.resources.client import CFOrchClient, Allocation # noqa: F401
|
||||
126
circuitforge_core/resources/client.py
Normal file
126
circuitforge_core/resources/client.py
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from contextlib import contextmanager, asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Allocation:
|
||||
allocation_id: str
|
||||
service: str
|
||||
node_id: str
|
||||
gpu_id: int
|
||||
model: str | None
|
||||
url: str
|
||||
started: bool
|
||||
warm: bool
|
||||
|
||||
|
||||
class CFOrchClient:
|
||||
"""
|
||||
Client for cf-orch coordinator allocation.
|
||||
|
||||
Sync usage (in LLMRouter or other sync code):
|
||||
client = CFOrchClient(os.environ["CF_ORCH_URL"])
|
||||
with client.allocate("vllm", model_candidates=["Ouro-1.4B"]) as alloc:
|
||||
# alloc.url is the inference endpoint
|
||||
|
||||
Async usage (in FastAPI apps):
|
||||
async with client.allocate_async("vllm", model_candidates=["Ouro-1.4B"]) as alloc:
|
||||
...
|
||||
|
||||
Raises ValueError immediately if coordinator_url is empty.
|
||||
"""
|
||||
|
||||
def __init__(self, coordinator_url: str) -> None:
|
||||
if not coordinator_url:
|
||||
raise ValueError("coordinator_url is empty — cf-orch not configured")
|
||||
self._url = coordinator_url.rstrip("/")
|
||||
|
||||
def _build_body(self, model_candidates: list[str] | None, ttl_s: float, caller: str) -> dict:
|
||||
return {
|
||||
"model_candidates": model_candidates or [],
|
||||
"ttl_s": ttl_s,
|
||||
"caller": caller,
|
||||
}
|
||||
|
||||
def _parse_allocation(self, data: dict, service: str) -> Allocation:
|
||||
return Allocation(
|
||||
allocation_id=data["allocation_id"],
|
||||
service=service,
|
||||
node_id=data["node_id"],
|
||||
gpu_id=data["gpu_id"],
|
||||
model=data.get("model"),
|
||||
url=data["url"],
|
||||
started=data.get("started", False),
|
||||
warm=data.get("warm", False),
|
||||
)
|
||||
|
||||
@contextmanager
|
||||
def allocate(
|
||||
self,
|
||||
service: str,
|
||||
*,
|
||||
model_candidates: list[str] | None = None,
|
||||
ttl_s: float = 3600.0,
|
||||
caller: str = "",
|
||||
):
|
||||
"""Sync context manager. Allocates on enter, releases on exit."""
|
||||
resp = httpx.post(
|
||||
f"{self._url}/api/services/{service}/allocate",
|
||||
json=self._build_body(model_candidates, ttl_s, caller),
|
||||
timeout=120.0,
|
||||
)
|
||||
if not resp.is_success:
|
||||
raise RuntimeError(
|
||||
f"cf-orch allocation failed for {service!r}: "
|
||||
f"HTTP {resp.status_code} — {resp.text[:200]}"
|
||||
)
|
||||
alloc = self._parse_allocation(resp.json(), service)
|
||||
try:
|
||||
yield alloc
|
||||
finally:
|
||||
try:
|
||||
httpx.delete(
|
||||
f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}",
|
||||
timeout=10.0,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("cf-orch release failed (non-fatal): %s", exc)
|
||||
|
||||
@asynccontextmanager
|
||||
async def allocate_async(
|
||||
self,
|
||||
service: str,
|
||||
*,
|
||||
model_candidates: list[str] | None = None,
|
||||
ttl_s: float = 3600.0,
|
||||
caller: str = "",
|
||||
):
|
||||
"""Async context manager. Allocates on enter, releases on exit."""
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
resp = await client.post(
|
||||
f"{self._url}/api/services/{service}/allocate",
|
||||
json=self._build_body(model_candidates, ttl_s, caller),
|
||||
)
|
||||
if not resp.is_success:
|
||||
raise RuntimeError(
|
||||
f"cf-orch allocation failed for {service!r}: "
|
||||
f"HTTP {resp.status_code} — {resp.text[:200]}"
|
||||
)
|
||||
alloc = self._parse_allocation(resp.json(), service)
|
||||
try:
|
||||
yield alloc
|
||||
finally:
|
||||
try:
|
||||
await client.delete(
|
||||
f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}",
|
||||
timeout=10.0,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("cf-orch async release failed (non-fatal): %s", exc)
|
||||
94
tests/test_resources/test_client.py
Normal file
94
tests/test_resources/test_client.py
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import httpretty
|
||||
from circuitforge_core.resources.client import CFOrchClient, Allocation
|
||||
|
||||
_ALLOC_BODY = (
|
||||
'{"allocation_id":"abc123","service":"vllm","node_id":"heimdall",'
|
||||
'"gpu_id":0,"model":"Ouro-1.4B","url":"http://heimdall:8000","started":false,"warm":true}'
|
||||
)
|
||||
|
||||
|
||||
@httpretty.activate
|
||||
def test_sync_allocate_returns_allocation():
|
||||
httpretty.register_uri(
|
||||
httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
|
||||
body=_ALLOC_BODY, content_type="application/json",
|
||||
)
|
||||
httpretty.register_uri(
|
||||
httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/abc123",
|
||||
body='{"released":true}', content_type="application/json",
|
||||
)
|
||||
client = CFOrchClient("http://orch:7700")
|
||||
with client.allocate("vllm", model_candidates=["Ouro-1.4B"], caller="test") as alloc:
|
||||
assert isinstance(alloc, Allocation)
|
||||
assert alloc.url == "http://heimdall:8000"
|
||||
assert alloc.model == "Ouro-1.4B"
|
||||
assert alloc.allocation_id == "abc123"
|
||||
assert httpretty.last_request().method == "DELETE"
|
||||
|
||||
|
||||
@httpretty.activate
|
||||
def test_sync_allocate_ignores_404_on_release():
|
||||
httpretty.register_uri(
|
||||
httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
|
||||
body='{"allocation_id":"xyz","service":"vllm","node_id":"a","gpu_id":0,'
|
||||
'"model":"m","url":"http://a:8000","started":false,"warm":false}',
|
||||
content_type="application/json",
|
||||
)
|
||||
httpretty.register_uri(
|
||||
httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/xyz",
|
||||
status=404, body='{"detail":"not found"}', content_type="application/json",
|
||||
)
|
||||
client = CFOrchClient("http://orch:7700")
|
||||
with client.allocate("vllm", model_candidates=["m"]) as alloc:
|
||||
assert alloc.url == "http://a:8000"
|
||||
# No exception raised — 404 on release is silently ignored
|
||||
|
||||
|
||||
@httpretty.activate
|
||||
def test_sync_allocate_raises_on_503():
|
||||
httpretty.register_uri(
|
||||
httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
|
||||
status=503, body='{"detail":"no capacity"}', content_type="application/json",
|
||||
)
|
||||
client = CFOrchClient("http://orch:7700")
|
||||
with pytest.raises(RuntimeError, match="cf-orch allocation failed"):
|
||||
with client.allocate("vllm", model_candidates=["m"]):
|
||||
pass
|
||||
|
||||
|
||||
async def test_async_allocate_works():
|
||||
# httpretty only patches stdlib sockets; httpx async uses anyio sockets so
|
||||
# we mock httpx.AsyncClient directly instead.
|
||||
alloc_data = {
|
||||
"allocation_id": "a1", "service": "vllm", "node_id": "n",
|
||||
"gpu_id": 0, "model": "m", "url": "http://n:8000",
|
||||
"started": False, "warm": False,
|
||||
}
|
||||
release_data = {"released": True}
|
||||
|
||||
def _make_response(data, status_code=200):
|
||||
resp = MagicMock()
|
||||
resp.is_success = status_code < 400
|
||||
resp.status_code = status_code
|
||||
resp.json.return_value = data
|
||||
return resp
|
||||
|
||||
mock_post = AsyncMock(return_value=_make_response(alloc_data))
|
||||
mock_delete = AsyncMock(return_value=_make_response(release_data))
|
||||
|
||||
mock_async_client = MagicMock()
|
||||
mock_async_client.post = mock_post
|
||||
mock_async_client.delete = mock_delete
|
||||
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
|
||||
mock_async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", return_value=mock_async_client):
|
||||
client = CFOrchClient("http://orch:7700")
|
||||
async with client.allocate_async("vllm", model_candidates=["m"]) as alloc:
|
||||
assert alloc.url == "http://n:8000"
|
||||
assert alloc.allocation_id == "a1"
|
||||
mock_delete.assert_called_once()
|
||||
Loading…
Reference in a new issue