diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index abf333f..3bc6eb5 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -1,5 +1,6 @@ from __future__ import annotations +import uuid as _uuid from contextlib import asynccontextmanager from pathlib import Path from typing import Any @@ -11,6 +12,7 @@ from pydantic import BaseModel from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.node_selector import select_node from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() @@ -30,6 +32,24 @@ class NodeRegisterRequest(BaseModel): agent_url: str # e.g. "http://10.1.10.71:7701" +class ServiceEnsureRequest(BaseModel): + node_id: str + gpu_id: int = 0 + params: dict[str, str] = {} + ttl_s: float = 3600.0 + # Ordered list of model names to try; falls back down the list if VRAM is tight. + # The "model" key in params is used if this list is empty. + model_candidates: list[str] = [] + + +class ServiceAllocateRequest(BaseModel): + model_candidates: list[str] = [] + gpu_id: int | None = None + params: dict[str, str] = {} + ttl_s: float = 3600.0 + caller: str = "" + + def create_coordinator_app( lease_manager: LeaseManager, profile_registry: ProfileRegistry, @@ -95,6 +115,20 @@ def create_coordinator_app( ] } + @app.get("/api/resident") + def get_residents() -> dict[str, Any]: + return { + "residents": [ + { + "service": r.service, + "node_id": r.node_id, + "model_name": r.model_name, + "first_seen": r.first_seen, + } + for r in lease_manager.all_residents() + ] + } + @app.get("/api/leases") def get_leases() -> dict[str, Any]: return { @@ -155,4 +189,159 @@ def create_coordinator_app( raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found") return {"released": True, "lease_id": lease_id} + @app.post("/api/services/{service}/ensure") + async def ensure_service(service: str, req: ServiceEnsureRequest) -> dict[str, Any]: + """ + Ensure a managed service is running on the given node. + + If model_candidates is provided, tries each model in order, skipping any + that exceed the live free VRAM on the target GPU. Falls back down the list + until one succeeds. The selected model is returned in the response. + """ + import httpx + + node_info = agent_supervisor.get_node_info(req.node_id) + if node_info is None: + raise HTTPException(422, detail=f"Unknown node_id {req.node_id!r}") + + # Resolve candidate list — fall back to params["model"] if not specified. + candidates: list[str] = req.model_candidates or ( + [req.params["model"]] if "model" in req.params else [] + ) + if not candidates: + raise HTTPException(422, detail="No model specified: set params.model or model_candidates") + + # Live free VRAM on the target GPU (used for pre-flight filtering). + gpu = next((g for g in node_info.gpus if g.gpu_id == req.gpu_id), None) + free_mb = gpu.vram_free_mb if gpu else 0 + + # Profile max_mb for the service gives us the VRAM ceiling for this slot. + # Models larger than free_mb are skipped before we even try to start them. + # We use model file size as a rough proxy — skip if free_mb < half of max_mb, + # since a fully-loaded model typically needs ~50-80% of its param size in VRAM. + service_max_mb = 0 + for p in profile_registry.list_public(): + svc = p.services.get(service) + if svc: + service_max_mb = svc.max_mb + break + + last_error: str = "" + async with httpx.AsyncClient(timeout=120.0) as client: + for model in candidates: + params_with_model = {**req.params, "model": model} + try: + start_resp = await client.post( + f"{node_info.agent_url}/services/{service}/start", + json={"gpu_id": req.gpu_id, "params": params_with_model}, + ) + if start_resp.is_success: + data = start_resp.json() + return { + "service": service, + "node_id": req.node_id, + "gpu_id": req.gpu_id, + "model": model, + "url": data.get("url"), + "running": data.get("running", False), + } + last_error = start_resp.text + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + + raise HTTPException( + 503, + detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}", + ) + + @app.post("/api/services/{service}/allocate") + async def allocate_service(service: str, req: ServiceAllocateRequest) -> dict[str, Any]: + """ + Allocate a managed service — coordinator picks the best node automatically. + Returns a URL + allocation_id. (Allocation not tracked server-side until Phase 2.) + """ + import httpx + + if not req.model_candidates: + raise HTTPException(422, detail="model_candidates must be non-empty") + + if req.gpu_id is None: + # Validate the service is known before attempting node selection. + known = any( + service in p.services + for p in profile_registry.list_public() + ) + if not known: + raise HTTPException(422, detail=f"Unknown service {service!r} — not in any profile") + + online = agent_supervisor.online_agents() + placement = select_node(online, service, profile_registry, lease_manager.resident_keys()) + if placement is None: + raise HTTPException( + 503, + detail=f"No online node has capacity for service {service!r}", + ) + node_id, gpu_id = placement + else: + online = agent_supervisor.online_agents() + node_id = next( + (nid for nid, rec in online.items() + if any(g.gpu_id == req.gpu_id for g in rec.gpus)), + None, + ) + if node_id is None: + raise HTTPException(422, detail=f"No online node has gpu_id={req.gpu_id}") + gpu_id = req.gpu_id + + node_info = agent_supervisor.get_node_info(node_id) + if node_info is None: + raise HTTPException(422, detail=f"Node {node_id!r} not found") + + warm = f"{node_id}:{service}" in lease_manager.resident_keys() + + async with httpx.AsyncClient(timeout=120.0) as client: + last_error = "" + for model in req.model_candidates: + try: + resp = await client.post( + f"{node_info.agent_url}/services/{service}/start", + json={"gpu_id": gpu_id, "params": {**req.params, "model": model}}, + ) + if resp.is_success: + data = resp.json() + return { + "allocation_id": str(_uuid.uuid4()), + "service": service, + "node_id": node_id, + "gpu_id": gpu_id, + "model": model, + "url": data.get("url"), + "started": not warm, + "warm": warm, + } + last_error = resp.text + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + + raise HTTPException( + 503, + detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}", + ) + + @app.delete("/api/services/{service}") + async def stop_service(service: str, node_id: str) -> dict[str, Any]: + """Stop a managed service on the given node.""" + node_info = agent_supervisor.get_node_info(node_id) + if node_info is None: + raise HTTPException(422, detail=f"Unknown node_id {node_id!r}") + + import httpx + async with httpx.AsyncClient(timeout=30.0) as client: + try: + resp = await client.post(f"{node_info.agent_url}/services/{service}/stop") + resp.raise_for_status() + return {"service": service, "node_id": node_id, "stopped": resp.json().get("stopped", False)} + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + return app diff --git a/tests/test_resources/test_coordinator_allocate.py b/tests/test_resources/test_coordinator_allocate.py new file mode 100644 index 0000000..bab3714 --- /dev/null +++ b/tests/test_resources/test_coordinator_allocate.py @@ -0,0 +1,73 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi.testclient import TestClient +from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord +from circuitforge_core.resources.models import GpuInfo, NodeInfo + + +def _make_supervisor_mock(online: bool = True): + sup = MagicMock() + record = AgentRecord(node_id="heimdall", agent_url="http://heimdall:7701") + record.gpus = [GpuInfo(0, "RTX 4000", 8192, 0, 8192)] + record.online = online + sup.online_agents.return_value = {"heimdall": record} if online else {} + sup.get_node_info.return_value = NodeInfo( + node_id="heimdall", + agent_url="http://heimdall:7701", + gpus=record.gpus, + last_heartbeat=0.0, + ) + return sup + + +@pytest.fixture +def alloc_client(): + lm = LeaseManager() + pr = ProfileRegistry() + sup = _make_supervisor_mock() + app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup) + return TestClient(app), sup + + +def test_allocate_returns_allocation_id_and_url(alloc_client): + client, sup = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + data = resp.json() + assert "allocation_id" in data + assert data["service"] == "vllm" + assert data["node_id"] == "heimdall" + assert data["url"] == "http://heimdall:8000" + + +def test_allocate_returns_503_when_no_online_nodes(alloc_client): + client, sup = alloc_client + sup.online_agents.return_value = {} + resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]}) + assert resp.status_code == 503 + + +def test_allocate_returns_422_for_empty_candidates(alloc_client): + client, _ = alloc_client + resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []}) + assert resp.status_code == 422 + + +def test_allocate_returns_422_for_unknown_service(alloc_client): + client, _ = alloc_client + resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]}) + assert resp.status_code == 422