feat: wire ServiceRegistry into coordinator allocate endpoints

This commit is contained in:
pyr0ball 2026-04-02 12:30:58 -07:00
parent c299482e0d
commit 49ab9e4e88
5 changed files with 139 additions and 8 deletions

View file

@ -1,6 +1,5 @@
from __future__ import annotations
import uuid as _uuid
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
@ -14,6 +13,7 @@ from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngi
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.node_selector import select_node
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
@ -54,6 +54,7 @@ def create_coordinator_app(
lease_manager: LeaseManager,
profile_registry: ProfileRegistry,
agent_supervisor: AgentSupervisor,
service_registry: ServiceRegistry,
) -> FastAPI:
eviction_engine = EvictionEngine(lease_manager=lease_manager)
@ -307,8 +308,17 @@ def create_coordinator_app(
)
if resp.is_success:
data = resp.json()
alloc = service_registry.allocate(
service=service,
node_id=node_id,
gpu_id=gpu_id,
model=model,
caller=req.caller,
url=data.get("url", ""),
ttl_s=req.ttl_s,
)
return {
"allocation_id": str(_uuid.uuid4()),
"allocation_id": alloc.allocation_id,
"service": service,
"node_id": node_id,
"gpu_id": gpu_id,
@ -326,6 +336,61 @@ def create_coordinator_app(
detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}",
)
@app.delete("/api/services/{service}/allocations/{allocation_id}")
async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]:
released = service_registry.release(allocation_id)
if not released:
raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found")
return {"released": True, "allocation_id": allocation_id}
@app.get("/api/services/{service}/status")
def get_service_status(service: str) -> dict[str, Any]:
instances = [i for i in service_registry.all_instances() if i.service == service]
allocations = [a for a in service_registry.all_allocations() if a.service == service]
return {
"service": service,
"instances": [
{
"node_id": i.node_id,
"gpu_id": i.gpu_id,
"state": i.state,
"model": i.model,
"url": i.url,
"idle_since": i.idle_since,
}
for i in instances
],
"allocations": [
{
"allocation_id": a.allocation_id,
"node_id": a.node_id,
"gpu_id": a.gpu_id,
"model": a.model,
"caller": a.caller,
"url": a.url,
"expires_at": a.expires_at,
}
for a in allocations
],
}
@app.get("/api/services")
def list_services() -> dict[str, Any]:
instances = service_registry.all_instances()
return {
"services": [
{
"service": i.service,
"node_id": i.node_id,
"gpu_id": i.gpu_id,
"state": i.state,
"model": i.model,
"url": i.url,
}
for i in instances
]
}
@app.delete("/api/services/{service}")
async def stop_service(service: str, node_id: str) -> dict[str, Any]:
"""Stop a managed service on the given node."""

View file

@ -121,6 +121,9 @@ class ServiceRegistry:
self._instances[key] = inst
return inst
def all_allocations(self) -> list[ServiceAllocation]:
return list(self._allocations.values())
def all_instances(self) -> list[ServiceInstance]:
return list(self._instances.values())

View file

@ -4,6 +4,7 @@ from fastapi.testclient import TestClient
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord
from circuitforge_core.resources.models import GpuInfo, NodeInfo
@ -28,12 +29,13 @@ def alloc_client():
lm = LeaseManager()
pr = ProfileRegistry()
sup = _make_supervisor_mock()
app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup)
return TestClient(app), sup
sr = ServiceRegistry()
app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup, service_registry=sr)
return TestClient(app), sup, sr
def test_allocate_returns_allocation_id_and_url(alloc_client):
client, sup = alloc_client
client, sup, sr = alloc_client
with patch("httpx.AsyncClient") as mock_http:
mock_resp = MagicMock()
mock_resp.is_success = True
@ -55,19 +57,76 @@ def test_allocate_returns_allocation_id_and_url(alloc_client):
def test_allocate_returns_503_when_no_online_nodes(alloc_client):
client, sup = alloc_client
client, sup, sr = alloc_client
sup.online_agents.return_value = {}
resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]})
assert resp.status_code == 503
def test_allocate_returns_422_for_empty_candidates(alloc_client):
client, _ = alloc_client
client, _, sr = alloc_client
resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []})
assert resp.status_code == 422
def test_allocate_returns_422_for_unknown_service(alloc_client):
client, _ = alloc_client
client, _, sr = alloc_client
resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]})
assert resp.status_code == 422
def test_allocate_records_in_registry(alloc_client):
client, sup, sr = alloc_client
with patch("httpx.AsyncClient") as mock_http:
mock_resp = MagicMock()
mock_resp.is_success = True
mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
resp = client.post("/api/services/vllm/allocate", json={
"model_candidates": ["Ouro-1.4B"],
"ttl_s": 300.0,
"caller": "test",
})
assert resp.status_code == 200
allocation_id = resp.json()["allocation_id"]
status_resp = client.get("/api/services/vllm/status")
assert status_resp.status_code == 200
status_data = status_resp.json()
assert status_data["service"] == "vllm"
alloc_ids = [a["allocation_id"] for a in status_data["allocations"]]
assert allocation_id in alloc_ids
def test_release_allocation(alloc_client):
client, sup, sr = alloc_client
with patch("httpx.AsyncClient") as mock_http:
mock_resp = MagicMock()
mock_resp.is_success = True
mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
resp = client.post("/api/services/vllm/allocate", json={
"model_candidates": ["Ouro-1.4B"],
"ttl_s": 300.0,
"caller": "test",
})
assert resp.status_code == 200
allocation_id = resp.json()["allocation_id"]
del_resp = client.delete(f"/api/services/vllm/allocations/{allocation_id}")
assert del_resp.status_code == 200
assert del_resp.json() == {"released": True, "allocation_id": allocation_id}
status_resp = client.get("/api/services/vllm/status")
alloc_ids = [a["allocation_id"] for a in status_resp.json()["allocations"]]
assert allocation_id not in alloc_ids
def test_release_allocation_not_found(alloc_client):
client, _, sr = alloc_client
resp = client.delete("/api/services/vllm/allocations/bad-id")
assert resp.status_code == 404

View file

@ -6,6 +6,7 @@ from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo
from circuitforge_core.resources.profiles.schema import load_profile
@ -35,6 +36,7 @@ def coordinator_client():
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=supervisor,
service_registry=ServiceRegistry(),
)
return TestClient(app), lease_manager

View file

@ -11,6 +11,7 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo
@ -47,6 +48,7 @@ def system():
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=mock_supervisor,
service_registry=ServiceRegistry(),
)
client = TestClient(app)
return client, lease_manager