feat(resources): cf-orch GPU VRAM orchestration — Plan A core #1
2 changed files with 226 additions and 0 deletions
124
circuitforge_core/resources/coordinator/app.py
Normal file
124
circuitforge_core/resources/coordinator/app.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
|
||||
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
|
||||
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
|
||||
|
||||
class LeaseRequest(BaseModel):
|
||||
node_id: str
|
||||
gpu_id: int
|
||||
mb: int
|
||||
service: str
|
||||
priority: int = 2
|
||||
ttl_s: float = 0.0
|
||||
|
||||
|
||||
def create_coordinator_app(
|
||||
lease_manager: LeaseManager,
|
||||
profile_registry: ProfileRegistry,
|
||||
agent_supervisor: AgentSupervisor,
|
||||
) -> FastAPI:
|
||||
eviction_engine = EvictionEngine(lease_manager=lease_manager)
|
||||
|
||||
app = FastAPI(title="cforch-coordinator")
|
||||
|
||||
@app.get("/api/health")
|
||||
def health() -> dict[str, Any]:
|
||||
return {"status": "ok"}
|
||||
|
||||
@app.get("/api/nodes")
|
||||
def get_nodes() -> dict[str, Any]:
|
||||
nodes = agent_supervisor.all_nodes()
|
||||
return {
|
||||
"nodes": [
|
||||
{
|
||||
"node_id": n.node_id,
|
||||
"agent_url": n.agent_url,
|
||||
"last_heartbeat": n.last_heartbeat,
|
||||
"gpus": [
|
||||
{
|
||||
"gpu_id": g.gpu_id,
|
||||
"name": g.name,
|
||||
"vram_total_mb": g.vram_total_mb,
|
||||
"vram_used_mb": g.vram_used_mb,
|
||||
"vram_free_mb": g.vram_free_mb,
|
||||
}
|
||||
for g in n.gpus
|
||||
],
|
||||
}
|
||||
for n in nodes
|
||||
]
|
||||
}
|
||||
|
||||
@app.get("/api/profiles")
|
||||
def get_profiles() -> dict[str, Any]:
|
||||
return {
|
||||
"profiles": [
|
||||
{"name": p.name, "vram_total_mb": p.vram_total_mb}
|
||||
for p in profile_registry.list_public()
|
||||
]
|
||||
}
|
||||
|
||||
@app.get("/api/leases")
|
||||
def get_leases() -> dict[str, Any]:
|
||||
return {
|
||||
"leases": [
|
||||
{
|
||||
"lease_id": lease.lease_id,
|
||||
"node_id": lease.node_id,
|
||||
"gpu_id": lease.gpu_id,
|
||||
"mb_granted": lease.mb_granted,
|
||||
"holder_service": lease.holder_service,
|
||||
"priority": lease.priority,
|
||||
"expires_at": lease.expires_at,
|
||||
}
|
||||
for lease in lease_manager.all_leases()
|
||||
]
|
||||
}
|
||||
|
||||
@app.post("/api/leases")
|
||||
async def request_lease(req: LeaseRequest) -> dict[str, Any]:
|
||||
node_info = agent_supervisor.get_node_info(req.node_id)
|
||||
agent_url = node_info.agent_url if node_info else "http://localhost:7701"
|
||||
|
||||
lease = await eviction_engine.request_lease(
|
||||
node_id=req.node_id,
|
||||
gpu_id=req.gpu_id,
|
||||
mb=req.mb,
|
||||
service=req.service,
|
||||
priority=req.priority,
|
||||
agent_url=agent_url,
|
||||
ttl_s=req.ttl_s,
|
||||
)
|
||||
if lease is None:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Insufficient VRAM — no eviction candidates available",
|
||||
)
|
||||
return {
|
||||
"lease": {
|
||||
"lease_id": lease.lease_id,
|
||||
"node_id": lease.node_id,
|
||||
"gpu_id": lease.gpu_id,
|
||||
"mb_granted": lease.mb_granted,
|
||||
"holder_service": lease.holder_service,
|
||||
"priority": lease.priority,
|
||||
"expires_at": lease.expires_at,
|
||||
}
|
||||
}
|
||||
|
||||
@app.delete("/api/leases/{lease_id}")
|
||||
async def release_lease(lease_id: str) -> dict[str, Any]:
|
||||
released = await lease_manager.release(lease_id)
|
||||
if not released:
|
||||
raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found")
|
||||
return {"released": True, "lease_id": lease_id}
|
||||
|
||||
return app
|
||||
102
tests/test_resources/test_coordinator_app.py
Normal file
102
tests/test_resources/test_coordinator_app.py
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from fastapi.testclient import TestClient
|
||||
from circuitforge_core.resources.coordinator.app import create_coordinator_app
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
from circuitforge_core.resources.models import GpuInfo, NodeInfo
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def coordinator_client():
|
||||
lease_manager = LeaseManager()
|
||||
lease_manager.register_gpu("heimdall", 0, 8192)
|
||||
profile_registry = ProfileRegistry()
|
||||
supervisor = MagicMock()
|
||||
supervisor.all_nodes.return_value = [
|
||||
NodeInfo(
|
||||
node_id="heimdall",
|
||||
agent_url="http://localhost:7701",
|
||||
gpus=[GpuInfo(gpu_id=0, name="RTX 4000",
|
||||
vram_total_mb=8192, vram_used_mb=0, vram_free_mb=8192)],
|
||||
last_heartbeat=0.0,
|
||||
)
|
||||
]
|
||||
supervisor.get_node_info.return_value = NodeInfo(
|
||||
node_id="heimdall",
|
||||
agent_url="http://localhost:7701",
|
||||
gpus=[],
|
||||
last_heartbeat=0.0,
|
||||
)
|
||||
app = create_coordinator_app(
|
||||
lease_manager=lease_manager,
|
||||
profile_registry=profile_registry,
|
||||
agent_supervisor=supervisor,
|
||||
)
|
||||
return TestClient(app), lease_manager
|
||||
|
||||
|
||||
def test_health_returns_ok(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.get("/api/health")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ok"
|
||||
|
||||
|
||||
def test_get_nodes_returns_list(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.get("/api/nodes")
|
||||
assert resp.status_code == 200
|
||||
nodes = resp.json()["nodes"]
|
||||
assert len(nodes) == 1
|
||||
assert nodes[0]["node_id"] == "heimdall"
|
||||
|
||||
|
||||
def test_get_profiles_returns_public_profiles(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.get("/api/profiles")
|
||||
assert resp.status_code == 200
|
||||
names = [p["name"] for p in resp.json()["profiles"]]
|
||||
assert "single-gpu-8gb" in names
|
||||
|
||||
|
||||
def test_post_lease_grants_lease(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.post("/api/leases", json={
|
||||
"node_id": "heimdall", "gpu_id": 0,
|
||||
"mb": 2048, "service": "peregrine", "priority": 1,
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["lease"]["mb_granted"] == 2048
|
||||
assert data["lease"]["holder_service"] == "peregrine"
|
||||
assert "lease_id" in data["lease"]
|
||||
|
||||
|
||||
def test_delete_lease_releases_it(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.post("/api/leases", json={
|
||||
"node_id": "heimdall", "gpu_id": 0,
|
||||
"mb": 2048, "service": "peregrine", "priority": 1,
|
||||
})
|
||||
lease_id = resp.json()["lease"]["lease_id"]
|
||||
del_resp = client.delete(f"/api/leases/{lease_id}")
|
||||
assert del_resp.status_code == 200
|
||||
assert del_resp.json()["released"] is True
|
||||
|
||||
|
||||
def test_delete_unknown_lease_returns_404(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.delete("/api/leases/nonexistent-id")
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
def test_get_leases_returns_active_leases(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
client.post("/api/leases", json={
|
||||
"node_id": "heimdall", "gpu_id": 0,
|
||||
"mb": 1024, "service": "kiwi", "priority": 2,
|
||||
})
|
||||
resp = client.get("/api/leases")
|
||||
assert resp.status_code == 200
|
||||
assert len(resp.json()["leases"]) == 1
|
||||
Loading…
Reference in a new issue