feat: implement GET /api/nodes-mgmt/nodes with coordinator proxy and profile merge

This commit is contained in:
pyr0ball 2026-05-05 20:16:06 -07:00
parent c039ea4698
commit c2de9e53da
2 changed files with 197 additions and 6 deletions

View file

@ -105,12 +105,84 @@ def _get_ollama_url(node_id: str) -> str:
@router.get("/nodes") @router.get("/nodes")
def list_nodes() -> list: def list_nodes() -> list:
"""List all nodes visible to the cf-orch coordinator. """Return all nodes with live GPU stats merged with profile YAML."""
import httpx
Returns an empty list if no coordinator_url is configured.
Full implementation arrives in Task 2 (live coordinator proxy).
"""
cfg = _load_config() cfg = _load_config()
if not cfg.get("coordinator_url"): coordinator_url = cfg.get("coordinator_url", "") or ""
if not coordinator_url:
return [] return []
return [] # full implementation in Task 2
try:
r = httpx.get(f"{coordinator_url}/api/nodes", timeout=5.0)
r.raise_for_status()
coord_nodes: list[dict] = r.json()
except (httpx.HTTPError, httpx.ConnectError) as exc:
logger.warning("Coordinator unreachable: %s", exc)
return []
try:
sr = httpx.get(f"{coordinator_url}/api/services", timeout=5.0)
sr.raise_for_status()
services_data: list[dict] = sr.json()
except Exception:
services_data = []
# Build per-node, per-GPU running services map
running: dict[str, dict[int, list[str]]] = {}
for svc in services_data:
nid = svc.get("node_id", "")
gid = svc.get("gpu_id")
svc_name = svc.get("service", "")
if nid and gid is not None and svc_name:
running.setdefault(nid, {}).setdefault(gid, []).append(svc_name)
result = []
for node in coord_nodes:
node_id = node.get("node_id", "") or node.get("id", "")
profile = _load_profile(node_id) if node_id else None
profile_loaded = profile is not None
gpus = []
for gpu in (node.get("gpus", []) or []):
gpu_id = gpu.get("gpu_id", gpu.get("id", 0))
services_assigned: list[str] = []
if profile:
node_entry = (profile.get("nodes", {}) or {}).get(node_id, {}) or {}
for g in (node_entry.get("gpus", []) or []):
if isinstance(g, dict) and g.get("id") == gpu_id:
services_assigned = g.get("services", []) or []
break
gpus.append({
"gpu_id": gpu_id,
"card": gpu.get("card", ""),
"vram_total_mb": gpu.get("vram_total_mb", 0),
"vram_used_mb": gpu.get("vram_used_mb", 0),
"vram_free_mb": gpu.get("vram_free_mb", 0),
"temp_c": gpu.get("temp_c"),
"utilization_pct": gpu.get("utilization_pct"),
"compute_cap": gpu.get("compute_cap"),
"services_assigned": services_assigned,
"services_running": running.get(node_id, {}).get(gpu_id, []),
})
services_catalog: dict = {}
if profile:
for svc_name, svc_info in (profile.get("services", {}) or {}).items():
catalog = svc_info.get("catalog", {}) or {}
services_catalog[svc_name] = {
"min_compute_cap": svc_info.get("min_compute_cap", 0.0),
"max_mb": svc_info.get("max_mb", 0),
"catalog_size": len(catalog),
}
result.append({
"node_id": node_id,
"online": node.get("online", True),
"agent_url": node.get("agent_url", ""),
"gpus": gpus,
"profile_loaded": profile_loaded,
"services_catalog": services_catalog,
})
return result

View file

@ -45,3 +45,122 @@ def test_list_nodes_returns_empty_when_no_coordinator(client):
r = client.get("/api/nodes-mgmt/nodes") r = client.get("/api/nodes-mgmt/nodes")
assert r.status_code == 200 assert r.status_code == 200
assert r.json() == [] assert r.json() == []
from unittest.mock import MagicMock, patch
def _fake_nodes_response(nodes_json: list, services_json: list | None = None):
"""Build side_effect list for two httpx.get calls: nodes then services."""
mock_nodes = MagicMock()
mock_nodes.raise_for_status = MagicMock()
mock_nodes.json.return_value = nodes_json
mock_services = MagicMock()
mock_services.raise_for_status = MagicMock()
mock_services.json.return_value = services_json or []
return [mock_nodes, mock_services]
def test_list_nodes_coordinator_unreachable_returns_empty(client, tmp_path):
"""Coordinator unreachable — returns [] with no 500."""
import httpx
_write_config(tmp_path, {"coordinator_url": "http://fake-coord:7700"})
with patch("httpx.get", side_effect=httpx.ConnectError("refused")):
r = client.get("/api/nodes-mgmt/nodes")
assert r.status_code == 200
assert r.json() == []
def test_list_nodes_merges_profile_data(client, tmp_path):
"""Profile YAML services_assigned merged with live GPU stats."""
profiles_dir = tmp_path / "profiles"
_write_config(tmp_path, {
"coordinator_url": "http://fake-coord:7700",
"profiles_dir": str(profiles_dir),
})
_write_profile(profiles_dir, "heimdall", {
"services": {
"cf-text": {"min_compute_cap": 7.0, "max_mb": 8192, "catalog": {}},
},
"nodes": {
"heimdall": {
"gpus": [{"id": 0, "vram_mb": 24576, "compute_cap": 8.6,
"services": ["cf-text"], "role": "primary", "card": "RTX 3090",
"always_on": True}],
"agent_url": "http://10.1.10.71:7701",
}
}
})
coord_nodes = [{
"node_id": "heimdall", "online": True, "agent_url": "http://10.1.10.71:7701",
"gpus": [{"gpu_id": 0, "card": "RTX 3090", "vram_total_mb": 24576,
"vram_used_mb": 4096, "vram_free_mb": 20480,
"temp_c": 42.0, "utilization_pct": 15.0, "compute_cap": 8.6}],
}]
with patch("httpx.get", side_effect=_fake_nodes_response(coord_nodes)):
r = client.get("/api/nodes-mgmt/nodes")
assert r.status_code == 200
data = r.json()
assert len(data) == 1
node = data[0]
assert node["node_id"] == "heimdall"
assert node["profile_loaded"] is True
assert node["gpus"][0]["services_assigned"] == ["cf-text"]
assert node["gpus"][0]["vram_total_mb"] == 24576
assert "cf-text" in node["services_catalog"]
def test_list_nodes_no_profile_returns_profile_loaded_false(client, tmp_path):
"""Node with no profile YAML — profile_loaded: false, GPU stats still returned."""
_write_config(tmp_path, {"coordinator_url": "http://fake-coord:7700"})
coord_nodes = [{
"node_id": "sif", "online": True, "agent_url": "http://10.1.10.158:7701",
"gpus": [{"gpu_id": 0, "card": "RTX 5060 Ti", "vram_total_mb": 16384,
"vram_used_mb": 0, "vram_free_mb": 16384,
"temp_c": None, "utilization_pct": None, "compute_cap": 10.0}],
}]
with patch("httpx.get", side_effect=_fake_nodes_response(coord_nodes)):
r = client.get("/api/nodes-mgmt/nodes")
assert r.status_code == 200
data = r.json()
node = data[0]
assert node["profile_loaded"] is False
assert node["gpus"][0]["card"] == "RTX 5060 Ti"
assert node["services_catalog"] == {}
def test_list_nodes_marks_running_services(client, tmp_path):
"""services_running populated from coordinator /api/services response."""
profiles_dir = tmp_path / "profiles"
_write_config(tmp_path, {
"coordinator_url": "http://fake-coord:7700",
"profiles_dir": str(profiles_dir),
})
_write_profile(profiles_dir, "heimdall", {
"services": {},
"nodes": {"heimdall": {"gpus": [{"id": 0, "vram_mb": 24576, "compute_cap": 8.6,
"services": ["cf-text"], "role": "p",
"card": "RTX 3090", "always_on": True}],
"agent_url": "http://10.1.10.71:7701"}}
})
coord_nodes = [{"node_id": "heimdall", "online": True,
"agent_url": "http://10.1.10.71:7701",
"gpus": [{"gpu_id": 0, "card": "RTX 3090", "vram_total_mb": 24576,
"vram_used_mb": 8192, "vram_free_mb": 16384,
"temp_c": 55.0, "utilization_pct": 80.0, "compute_cap": 8.6}]}]
coord_services = [{"service": "cf-text", "node_id": "heimdall", "gpu_id": 0}]
with patch("httpx.get", side_effect=_fake_nodes_response(coord_nodes, coord_services)):
r = client.get("/api/nodes-mgmt/nodes")
data = r.json()
assert data[0]["gpus"][0]["services_running"] == ["cf-text"]