From 49ab9e4e8875b5bdf8e1220bcb0afc3f2c5fe3e9 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:30:58 -0700 Subject: [PATCH] feat: wire ServiceRegistry into coordinator allocate endpoints --- .../resources/coordinator/app.py | 69 +++++++++++++++++- .../resources/coordinator/service_registry.py | 3 + .../test_coordinator_allocate.py | 71 +++++++++++++++++-- tests/test_resources/test_coordinator_app.py | 2 + tests/test_resources/test_integration.py | 2 + 5 files changed, 139 insertions(+), 8 deletions(-) diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index 6f4e66e..617cb87 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -1,6 +1,5 @@ from __future__ import annotations -import uuid as _uuid from contextlib import asynccontextmanager from pathlib import Path from typing import Any @@ -14,6 +13,7 @@ from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngi from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.node_selector import select_node from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() @@ -54,6 +54,7 @@ def create_coordinator_app( lease_manager: LeaseManager, profile_registry: ProfileRegistry, agent_supervisor: AgentSupervisor, + service_registry: ServiceRegistry, ) -> FastAPI: eviction_engine = EvictionEngine(lease_manager=lease_manager) @@ -307,8 +308,17 @@ def create_coordinator_app( ) if resp.is_success: data = resp.json() + alloc = service_registry.allocate( + service=service, + node_id=node_id, + gpu_id=gpu_id, + model=model, + caller=req.caller, + url=data.get("url", ""), + ttl_s=req.ttl_s, + ) return { - "allocation_id": str(_uuid.uuid4()), + "allocation_id": alloc.allocation_id, "service": service, "node_id": node_id, "gpu_id": gpu_id, @@ -326,6 +336,61 @@ def create_coordinator_app( detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}", ) + @app.delete("/api/services/{service}/allocations/{allocation_id}") + async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]: + released = service_registry.release(allocation_id) + if not released: + raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found") + return {"released": True, "allocation_id": allocation_id} + + @app.get("/api/services/{service}/status") + def get_service_status(service: str) -> dict[str, Any]: + instances = [i for i in service_registry.all_instances() if i.service == service] + allocations = [a for a in service_registry.all_allocations() if a.service == service] + return { + "service": service, + "instances": [ + { + "node_id": i.node_id, + "gpu_id": i.gpu_id, + "state": i.state, + "model": i.model, + "url": i.url, + "idle_since": i.idle_since, + } + for i in instances + ], + "allocations": [ + { + "allocation_id": a.allocation_id, + "node_id": a.node_id, + "gpu_id": a.gpu_id, + "model": a.model, + "caller": a.caller, + "url": a.url, + "expires_at": a.expires_at, + } + for a in allocations + ], + } + + @app.get("/api/services") + def list_services() -> dict[str, Any]: + instances = service_registry.all_instances() + return { + "services": [ + { + "service": i.service, + "node_id": i.node_id, + "gpu_id": i.gpu_id, + "state": i.state, + "model": i.model, + "url": i.url, + } + for i in instances + ] + } + @app.delete("/api/services/{service}") async def stop_service(service: str, node_id: str) -> dict[str, Any]: """Stop a managed service on the given node.""" diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py index 3d0df1b..ee4aa8c 100644 --- a/circuitforge_core/resources/coordinator/service_registry.py +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -121,6 +121,9 @@ class ServiceRegistry: self._instances[key] = inst return inst + def all_allocations(self) -> list[ServiceAllocation]: + return list(self._allocations.values()) + def all_instances(self) -> list[ServiceInstance]: return list(self._instances.values()) diff --git a/tests/test_resources/test_coordinator_allocate.py b/tests/test_resources/test_coordinator_allocate.py index bab3714..eb9e078 100644 --- a/tests/test_resources/test_coordinator_allocate.py +++ b/tests/test_resources/test_coordinator_allocate.py @@ -4,6 +4,7 @@ from fastapi.testclient import TestClient from circuitforge_core.resources.coordinator.app import create_coordinator_app from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord from circuitforge_core.resources.models import GpuInfo, NodeInfo @@ -28,12 +29,13 @@ def alloc_client(): lm = LeaseManager() pr = ProfileRegistry() sup = _make_supervisor_mock() - app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup) - return TestClient(app), sup + sr = ServiceRegistry() + app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup, service_registry=sr) + return TestClient(app), sup, sr def test_allocate_returns_allocation_id_and_url(alloc_client): - client, sup = alloc_client + client, sup, sr = alloc_client with patch("httpx.AsyncClient") as mock_http: mock_resp = MagicMock() mock_resp.is_success = True @@ -55,19 +57,76 @@ def test_allocate_returns_allocation_id_and_url(alloc_client): def test_allocate_returns_503_when_no_online_nodes(alloc_client): - client, sup = alloc_client + client, sup, sr = alloc_client sup.online_agents.return_value = {} resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]}) assert resp.status_code == 503 def test_allocate_returns_422_for_empty_candidates(alloc_client): - client, _ = alloc_client + client, _, sr = alloc_client resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []}) assert resp.status_code == 422 def test_allocate_returns_422_for_unknown_service(alloc_client): - client, _ = alloc_client + client, _, sr = alloc_client resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]}) assert resp.status_code == 422 + + +def test_allocate_records_in_registry(alloc_client): + client, sup, sr = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + allocation_id = resp.json()["allocation_id"] + + status_resp = client.get("/api/services/vllm/status") + assert status_resp.status_code == 200 + status_data = status_resp.json() + assert status_data["service"] == "vllm" + alloc_ids = [a["allocation_id"] for a in status_data["allocations"]] + assert allocation_id in alloc_ids + + +def test_release_allocation(alloc_client): + client, sup, sr = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + allocation_id = resp.json()["allocation_id"] + + del_resp = client.delete(f"/api/services/vllm/allocations/{allocation_id}") + assert del_resp.status_code == 200 + assert del_resp.json() == {"released": True, "allocation_id": allocation_id} + + status_resp = client.get("/api/services/vllm/status") + alloc_ids = [a["allocation_id"] for a in status_resp.json()["allocations"]] + assert allocation_id not in alloc_ids + + +def test_release_allocation_not_found(alloc_client): + client, _, sr = alloc_client + resp = client.delete("/api/services/vllm/allocations/bad-id") + assert resp.status_code == 404 diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py index 60d67c1..5981f8f 100644 --- a/tests/test_resources/test_coordinator_app.py +++ b/tests/test_resources/test_coordinator_app.py @@ -6,6 +6,7 @@ from circuitforge_core.resources.coordinator.app import create_coordinator_app from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo from circuitforge_core.resources.profiles.schema import load_profile @@ -35,6 +36,7 @@ def coordinator_client(): lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=supervisor, + service_registry=ServiceRegistry(), ) return TestClient(app), lease_manager diff --git a/tests/test_resources/test_integration.py b/tests/test_resources/test_integration.py index 814bb32..8fa94ad 100644 --- a/tests/test_resources/test_integration.py +++ b/tests/test_resources/test_integration.py @@ -11,6 +11,7 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo @@ -47,6 +48,7 @@ def system(): lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=mock_supervisor, + service_registry=ServiceRegistry(), ) client = TestClient(app) return client, lease_manager