From 4bcd297b182760d3ccb4b71e3dadcca1810d20ad Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 30 Mar 2026 22:01:46 -0700 Subject: [PATCH] feat(resources): add cforch-coordinator FastAPI app with lease/node/profile endpoints --- .../resources/coordinator/app.py | 124 ++++++++++++++++++ tests/test_resources/test_coordinator_app.py | 102 ++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 circuitforge_core/resources/coordinator/app.py create mode 100644 tests/test_resources/test_coordinator_app.py diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py new file mode 100644 index 0000000..cd51b4b --- /dev/null +++ b/circuitforge_core/resources/coordinator/app.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +from typing import Any + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor +from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + + +class LeaseRequest(BaseModel): + node_id: str + gpu_id: int + mb: int + service: str + priority: int = 2 + ttl_s: float = 0.0 + + +def create_coordinator_app( + lease_manager: LeaseManager, + profile_registry: ProfileRegistry, + agent_supervisor: AgentSupervisor, +) -> FastAPI: + eviction_engine = EvictionEngine(lease_manager=lease_manager) + + app = FastAPI(title="cforch-coordinator") + + @app.get("/api/health") + def health() -> dict[str, Any]: + return {"status": "ok"} + + @app.get("/api/nodes") + def get_nodes() -> dict[str, Any]: + nodes = agent_supervisor.all_nodes() + return { + "nodes": [ + { + "node_id": n.node_id, + "agent_url": n.agent_url, + "last_heartbeat": n.last_heartbeat, + "gpus": [ + { + "gpu_id": g.gpu_id, + "name": g.name, + "vram_total_mb": g.vram_total_mb, + "vram_used_mb": g.vram_used_mb, + "vram_free_mb": g.vram_free_mb, + } + for g in n.gpus + ], + } + for n in nodes + ] + } + + @app.get("/api/profiles") + def get_profiles() -> dict[str, Any]: + return { + "profiles": [ + {"name": p.name, "vram_total_mb": p.vram_total_mb} + for p in profile_registry.list_public() + ] + } + + @app.get("/api/leases") + def get_leases() -> dict[str, Any]: + return { + "leases": [ + { + "lease_id": lease.lease_id, + "node_id": lease.node_id, + "gpu_id": lease.gpu_id, + "mb_granted": lease.mb_granted, + "holder_service": lease.holder_service, + "priority": lease.priority, + "expires_at": lease.expires_at, + } + for lease in lease_manager.all_leases() + ] + } + + @app.post("/api/leases") + async def request_lease(req: LeaseRequest) -> dict[str, Any]: + node_info = agent_supervisor.get_node_info(req.node_id) + agent_url = node_info.agent_url if node_info else "http://localhost:7701" + + lease = await eviction_engine.request_lease( + node_id=req.node_id, + gpu_id=req.gpu_id, + mb=req.mb, + service=req.service, + priority=req.priority, + agent_url=agent_url, + ttl_s=req.ttl_s, + ) + if lease is None: + raise HTTPException( + status_code=503, + detail="Insufficient VRAM — no eviction candidates available", + ) + return { + "lease": { + "lease_id": lease.lease_id, + "node_id": lease.node_id, + "gpu_id": lease.gpu_id, + "mb_granted": lease.mb_granted, + "holder_service": lease.holder_service, + "priority": lease.priority, + "expires_at": lease.expires_at, + } + } + + @app.delete("/api/leases/{lease_id}") + async def release_lease(lease_id: str) -> dict[str, Any]: + released = await lease_manager.release(lease_id) + if not released: + raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found") + return {"released": True, "lease_id": lease_id} + + return app diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py new file mode 100644 index 0000000..1fb8ce3 --- /dev/null +++ b/tests/test_resources/test_coordinator_app.py @@ -0,0 +1,102 @@ +import pytest +from unittest.mock import MagicMock +from fastapi.testclient import TestClient +from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.models import GpuInfo, NodeInfo + + +@pytest.fixture +def coordinator_client(): + lease_manager = LeaseManager() + lease_manager.register_gpu("heimdall", 0, 8192) + profile_registry = ProfileRegistry() + supervisor = MagicMock() + supervisor.all_nodes.return_value = [ + NodeInfo( + node_id="heimdall", + agent_url="http://localhost:7701", + gpus=[GpuInfo(gpu_id=0, name="RTX 4000", + vram_total_mb=8192, vram_used_mb=0, vram_free_mb=8192)], + last_heartbeat=0.0, + ) + ] + supervisor.get_node_info.return_value = NodeInfo( + node_id="heimdall", + agent_url="http://localhost:7701", + gpus=[], + last_heartbeat=0.0, + ) + app = create_coordinator_app( + lease_manager=lease_manager, + profile_registry=profile_registry, + agent_supervisor=supervisor, + ) + return TestClient(app), lease_manager + + +def test_health_returns_ok(coordinator_client): + client, _ = coordinator_client + resp = client.get("/api/health") + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" + + +def test_get_nodes_returns_list(coordinator_client): + client, _ = coordinator_client + resp = client.get("/api/nodes") + assert resp.status_code == 200 + nodes = resp.json()["nodes"] + assert len(nodes) == 1 + assert nodes[0]["node_id"] == "heimdall" + + +def test_get_profiles_returns_public_profiles(coordinator_client): + client, _ = coordinator_client + resp = client.get("/api/profiles") + assert resp.status_code == 200 + names = [p["name"] for p in resp.json()["profiles"]] + assert "single-gpu-8gb" in names + + +def test_post_lease_grants_lease(coordinator_client): + client, _ = coordinator_client + resp = client.post("/api/leases", json={ + "node_id": "heimdall", "gpu_id": 0, + "mb": 2048, "service": "peregrine", "priority": 1, + }) + assert resp.status_code == 200 + data = resp.json() + assert data["lease"]["mb_granted"] == 2048 + assert data["lease"]["holder_service"] == "peregrine" + assert "lease_id" in data["lease"] + + +def test_delete_lease_releases_it(coordinator_client): + client, _ = coordinator_client + resp = client.post("/api/leases", json={ + "node_id": "heimdall", "gpu_id": 0, + "mb": 2048, "service": "peregrine", "priority": 1, + }) + lease_id = resp.json()["lease"]["lease_id"] + del_resp = client.delete(f"/api/leases/{lease_id}") + assert del_resp.status_code == 200 + assert del_resp.json()["released"] is True + + +def test_delete_unknown_lease_returns_404(coordinator_client): + client, _ = coordinator_client + resp = client.delete("/api/leases/nonexistent-id") + assert resp.status_code == 404 + + +def test_get_leases_returns_active_leases(coordinator_client): + client, _ = coordinator_client + client.post("/api/leases", json={ + "node_id": "heimdall", "gpu_id": 0, + "mb": 1024, "service": "kiwi", "priority": 2, + }) + resp = client.get("/api/leases") + assert resp.status_code == 200 + assert len(resp.json()["leases"]) == 1