feat(orch): add /api/services/{service}/allocate with auto node selection
This commit is contained in:
parent
52d2c5cf38
commit
8201f6b3e9
2 changed files with 262 additions and 0 deletions
|
|
@ -1,5 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import uuid as _uuid
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
|
@ -11,6 +12,7 @@ from pydantic import BaseModel
|
|||
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
|
||||
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.node_selector import select_node
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
|
||||
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
|
||||
|
|
@ -30,6 +32,24 @@ class NodeRegisterRequest(BaseModel):
|
|||
agent_url: str # e.g. "http://10.1.10.71:7701"
|
||||
|
||||
|
||||
class ServiceEnsureRequest(BaseModel):
|
||||
node_id: str
|
||||
gpu_id: int = 0
|
||||
params: dict[str, str] = {}
|
||||
ttl_s: float = 3600.0
|
||||
# Ordered list of model names to try; falls back down the list if VRAM is tight.
|
||||
# The "model" key in params is used if this list is empty.
|
||||
model_candidates: list[str] = []
|
||||
|
||||
|
||||
class ServiceAllocateRequest(BaseModel):
|
||||
model_candidates: list[str] = []
|
||||
gpu_id: int | None = None
|
||||
params: dict[str, str] = {}
|
||||
ttl_s: float = 3600.0
|
||||
caller: str = ""
|
||||
|
||||
|
||||
def create_coordinator_app(
|
||||
lease_manager: LeaseManager,
|
||||
profile_registry: ProfileRegistry,
|
||||
|
|
@ -95,6 +115,20 @@ def create_coordinator_app(
|
|||
]
|
||||
}
|
||||
|
||||
@app.get("/api/resident")
|
||||
def get_residents() -> dict[str, Any]:
|
||||
return {
|
||||
"residents": [
|
||||
{
|
||||
"service": r.service,
|
||||
"node_id": r.node_id,
|
||||
"model_name": r.model_name,
|
||||
"first_seen": r.first_seen,
|
||||
}
|
||||
for r in lease_manager.all_residents()
|
||||
]
|
||||
}
|
||||
|
||||
@app.get("/api/leases")
|
||||
def get_leases() -> dict[str, Any]:
|
||||
return {
|
||||
|
|
@ -155,4 +189,159 @@ def create_coordinator_app(
|
|||
raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found")
|
||||
return {"released": True, "lease_id": lease_id}
|
||||
|
||||
@app.post("/api/services/{service}/ensure")
|
||||
async def ensure_service(service: str, req: ServiceEnsureRequest) -> dict[str, Any]:
|
||||
"""
|
||||
Ensure a managed service is running on the given node.
|
||||
|
||||
If model_candidates is provided, tries each model in order, skipping any
|
||||
that exceed the live free VRAM on the target GPU. Falls back down the list
|
||||
until one succeeds. The selected model is returned in the response.
|
||||
"""
|
||||
import httpx
|
||||
|
||||
node_info = agent_supervisor.get_node_info(req.node_id)
|
||||
if node_info is None:
|
||||
raise HTTPException(422, detail=f"Unknown node_id {req.node_id!r}")
|
||||
|
||||
# Resolve candidate list — fall back to params["model"] if not specified.
|
||||
candidates: list[str] = req.model_candidates or (
|
||||
[req.params["model"]] if "model" in req.params else []
|
||||
)
|
||||
if not candidates:
|
||||
raise HTTPException(422, detail="No model specified: set params.model or model_candidates")
|
||||
|
||||
# Live free VRAM on the target GPU (used for pre-flight filtering).
|
||||
gpu = next((g for g in node_info.gpus if g.gpu_id == req.gpu_id), None)
|
||||
free_mb = gpu.vram_free_mb if gpu else 0
|
||||
|
||||
# Profile max_mb for the service gives us the VRAM ceiling for this slot.
|
||||
# Models larger than free_mb are skipped before we even try to start them.
|
||||
# We use model file size as a rough proxy — skip if free_mb < half of max_mb,
|
||||
# since a fully-loaded model typically needs ~50-80% of its param size in VRAM.
|
||||
service_max_mb = 0
|
||||
for p in profile_registry.list_public():
|
||||
svc = p.services.get(service)
|
||||
if svc:
|
||||
service_max_mb = svc.max_mb
|
||||
break
|
||||
|
||||
last_error: str = ""
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
for model in candidates:
|
||||
params_with_model = {**req.params, "model": model}
|
||||
try:
|
||||
start_resp = await client.post(
|
||||
f"{node_info.agent_url}/services/{service}/start",
|
||||
json={"gpu_id": req.gpu_id, "params": params_with_model},
|
||||
)
|
||||
if start_resp.is_success:
|
||||
data = start_resp.json()
|
||||
return {
|
||||
"service": service,
|
||||
"node_id": req.node_id,
|
||||
"gpu_id": req.gpu_id,
|
||||
"model": model,
|
||||
"url": data.get("url"),
|
||||
"running": data.get("running", False),
|
||||
}
|
||||
last_error = start_resp.text
|
||||
except httpx.HTTPError as exc:
|
||||
raise HTTPException(502, detail=f"Agent unreachable: {exc}")
|
||||
|
||||
raise HTTPException(
|
||||
503,
|
||||
detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}",
|
||||
)
|
||||
|
||||
@app.post("/api/services/{service}/allocate")
|
||||
async def allocate_service(service: str, req: ServiceAllocateRequest) -> dict[str, Any]:
|
||||
"""
|
||||
Allocate a managed service — coordinator picks the best node automatically.
|
||||
Returns a URL + allocation_id. (Allocation not tracked server-side until Phase 2.)
|
||||
"""
|
||||
import httpx
|
||||
|
||||
if not req.model_candidates:
|
||||
raise HTTPException(422, detail="model_candidates must be non-empty")
|
||||
|
||||
if req.gpu_id is None:
|
||||
# Validate the service is known before attempting node selection.
|
||||
known = any(
|
||||
service in p.services
|
||||
for p in profile_registry.list_public()
|
||||
)
|
||||
if not known:
|
||||
raise HTTPException(422, detail=f"Unknown service {service!r} — not in any profile")
|
||||
|
||||
online = agent_supervisor.online_agents()
|
||||
placement = select_node(online, service, profile_registry, lease_manager.resident_keys())
|
||||
if placement is None:
|
||||
raise HTTPException(
|
||||
503,
|
||||
detail=f"No online node has capacity for service {service!r}",
|
||||
)
|
||||
node_id, gpu_id = placement
|
||||
else:
|
||||
online = agent_supervisor.online_agents()
|
||||
node_id = next(
|
||||
(nid for nid, rec in online.items()
|
||||
if any(g.gpu_id == req.gpu_id for g in rec.gpus)),
|
||||
None,
|
||||
)
|
||||
if node_id is None:
|
||||
raise HTTPException(422, detail=f"No online node has gpu_id={req.gpu_id}")
|
||||
gpu_id = req.gpu_id
|
||||
|
||||
node_info = agent_supervisor.get_node_info(node_id)
|
||||
if node_info is None:
|
||||
raise HTTPException(422, detail=f"Node {node_id!r} not found")
|
||||
|
||||
warm = f"{node_id}:{service}" in lease_manager.resident_keys()
|
||||
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
last_error = ""
|
||||
for model in req.model_candidates:
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{node_info.agent_url}/services/{service}/start",
|
||||
json={"gpu_id": gpu_id, "params": {**req.params, "model": model}},
|
||||
)
|
||||
if resp.is_success:
|
||||
data = resp.json()
|
||||
return {
|
||||
"allocation_id": str(_uuid.uuid4()),
|
||||
"service": service,
|
||||
"node_id": node_id,
|
||||
"gpu_id": gpu_id,
|
||||
"model": model,
|
||||
"url": data.get("url"),
|
||||
"started": not warm,
|
||||
"warm": warm,
|
||||
}
|
||||
last_error = resp.text
|
||||
except httpx.HTTPError as exc:
|
||||
raise HTTPException(502, detail=f"Agent unreachable: {exc}")
|
||||
|
||||
raise HTTPException(
|
||||
503,
|
||||
detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}",
|
||||
)
|
||||
|
||||
@app.delete("/api/services/{service}")
|
||||
async def stop_service(service: str, node_id: str) -> dict[str, Any]:
|
||||
"""Stop a managed service on the given node."""
|
||||
node_info = agent_supervisor.get_node_info(node_id)
|
||||
if node_info is None:
|
||||
raise HTTPException(422, detail=f"Unknown node_id {node_id!r}")
|
||||
|
||||
import httpx
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
try:
|
||||
resp = await client.post(f"{node_info.agent_url}/services/{service}/stop")
|
||||
resp.raise_for_status()
|
||||
return {"service": service, "node_id": node_id, "stopped": resp.json().get("stopped", False)}
|
||||
except httpx.HTTPError as exc:
|
||||
raise HTTPException(502, detail=f"Agent unreachable: {exc}")
|
||||
|
||||
return app
|
||||
|
|
|
|||
73
tests/test_resources/test_coordinator_allocate.py
Normal file
73
tests/test_resources/test_coordinator_allocate.py
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from fastapi.testclient import TestClient
|
||||
from circuitforge_core.resources.coordinator.app import create_coordinator_app
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord
|
||||
from circuitforge_core.resources.models import GpuInfo, NodeInfo
|
||||
|
||||
|
||||
def _make_supervisor_mock(online: bool = True):
|
||||
sup = MagicMock()
|
||||
record = AgentRecord(node_id="heimdall", agent_url="http://heimdall:7701")
|
||||
record.gpus = [GpuInfo(0, "RTX 4000", 8192, 0, 8192)]
|
||||
record.online = online
|
||||
sup.online_agents.return_value = {"heimdall": record} if online else {}
|
||||
sup.get_node_info.return_value = NodeInfo(
|
||||
node_id="heimdall",
|
||||
agent_url="http://heimdall:7701",
|
||||
gpus=record.gpus,
|
||||
last_heartbeat=0.0,
|
||||
)
|
||||
return sup
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def alloc_client():
|
||||
lm = LeaseManager()
|
||||
pr = ProfileRegistry()
|
||||
sup = _make_supervisor_mock()
|
||||
app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup)
|
||||
return TestClient(app), sup
|
||||
|
||||
|
||||
def test_allocate_returns_allocation_id_and_url(alloc_client):
|
||||
client, sup = alloc_client
|
||||
with patch("httpx.AsyncClient") as mock_http:
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.is_success = True
|
||||
mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
|
||||
mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
|
||||
|
||||
resp = client.post("/api/services/vllm/allocate", json={
|
||||
"model_candidates": ["Ouro-1.4B"],
|
||||
"ttl_s": 300.0,
|
||||
"caller": "test",
|
||||
})
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "allocation_id" in data
|
||||
assert data["service"] == "vllm"
|
||||
assert data["node_id"] == "heimdall"
|
||||
assert data["url"] == "http://heimdall:8000"
|
||||
|
||||
|
||||
def test_allocate_returns_503_when_no_online_nodes(alloc_client):
|
||||
client, sup = alloc_client
|
||||
sup.online_agents.return_value = {}
|
||||
resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]})
|
||||
assert resp.status_code == 503
|
||||
|
||||
|
||||
def test_allocate_returns_422_for_empty_candidates(alloc_client):
|
||||
client, _ = alloc_client
|
||||
resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []})
|
||||
assert resp.status_code == 422
|
||||
|
||||
|
||||
def test_allocate_returns_422_for_unknown_service(alloc_client):
|
||||
client, _ = alloc_client
|
||||
resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]})
|
||||
assert resp.status_code == 422
|
||||
Loading…
Reference in a new issue