From 13eb0c85f1f305b352c67dff20024ed1591e5ac0 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 11:18:44 -0700 Subject: [PATCH 01/15] =?UTF-8?q?feat(orch):=20add=20NodeSelector=20?= =?UTF-8?q?=E2=80=94=20warm-first=20GPU=20scoring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/coordinator/node_selector.py | 70 +++++++++++++++++++ tests/test_resources/test_node_selector.py | 56 +++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 circuitforge_core/resources/coordinator/node_selector.py create mode 100644 tests/test_resources/test_node_selector.py diff --git a/circuitforge_core/resources/coordinator/node_selector.py b/circuitforge_core/resources/coordinator/node_selector.py new file mode 100644 index 0000000..665cbb5 --- /dev/null +++ b/circuitforge_core/resources/coordinator/node_selector.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord + from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + +_WARM_BONUS_MB = 1000 + + +@dataclass +class _Scored: + node_id: str + gpu_id: int + vram_free_mb: int + effective_free_mb: int + can_fit: bool + warm: bool + + +def select_node( + agents: "dict[str, AgentRecord]", + service: str, + profile_registry: "ProfileRegistry", + resident_keys: set[str], +) -> tuple[str, int] | None: + """ + Pick the best (node_id, gpu_id) for the requested service. + Warm nodes (service already running) get priority, then sorted by free VRAM. + Returns None if no suitable node exists. + """ + candidates: list[_Scored] = [] + for node_id, record in agents.items(): + if not record.online: + continue + service_max_mb = _find_service_max_mb(service, profile_registry) + if service_max_mb is None: + continue + for gpu in record.gpus: + warm = f"{node_id}:{service}" in resident_keys + effective = gpu.vram_free_mb + (_WARM_BONUS_MB if warm else 0) + can_fit = gpu.vram_free_mb >= service_max_mb // 2 + candidates.append(_Scored( + node_id=node_id, + gpu_id=gpu.gpu_id, + vram_free_mb=gpu.vram_free_mb, + effective_free_mb=effective, + can_fit=can_fit, + warm=warm, + )) + if not candidates: + return None + # Warm nodes are always eligible (they already have the service resident). + # Cold nodes must pass the can_fit threshold. If no node passes either + # criterion, fall back to the full candidate set. + preferred = [c for c in candidates if c.warm or c.can_fit] + pool = preferred if preferred else candidates + # Warm nodes take priority; within the same warmth tier, prefer more free VRAM. + best = max(pool, key=lambda c: (c.warm, c.effective_free_mb)) + return best.node_id, best.gpu_id + + +def _find_service_max_mb(service: str, profile_registry: "ProfileRegistry") -> int | None: + for profile in profile_registry.list_public(): + svc = profile.services.get(service) + if svc is not None: + return svc.max_mb + return None diff --git a/tests/test_resources/test_node_selector.py b/tests/test_resources/test_node_selector.py new file mode 100644 index 0000000..9e18a3a --- /dev/null +++ b/tests/test_resources/test_node_selector.py @@ -0,0 +1,56 @@ +import pytest +from circuitforge_core.resources.coordinator.node_selector import select_node +from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord +from circuitforge_core.resources.models import GpuInfo +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + + +def _make_agent(node_id: str, free_mb: int, online: bool = True) -> AgentRecord: + r = AgentRecord(node_id=node_id, agent_url=f"http://{node_id}:7701") + r.gpus = [GpuInfo(gpu_id=0, name="RTX", vram_total_mb=8192, + vram_used_mb=8192 - free_mb, vram_free_mb=free_mb)] + r.online = online + return r + + +def test_selects_node_with_most_free_vram(): + agents = { + "a": _make_agent("a", free_mb=2000), + "b": _make_agent("b", free_mb=6000), + } + registry = ProfileRegistry() + result = select_node(agents, "vllm", registry, resident_keys=set()) + assert result == ("b", 0) + + +def test_prefers_warm_node_even_with_less_free_vram(): + agents = { + "a": _make_agent("a", free_mb=2000), + "b": _make_agent("b", free_mb=6000), + } + registry = ProfileRegistry() + result = select_node(agents, "vllm", registry, resident_keys={"a:vllm"}) + assert result == ("a", 0) + + +def test_excludes_offline_nodes(): + agents = { + "a": _make_agent("a", free_mb=8000, online=False), + "b": _make_agent("b", free_mb=2000, online=True), + } + registry = ProfileRegistry() + result = select_node(agents, "vllm", registry, resident_keys=set()) + assert result == ("b", 0) + + +def test_returns_none_when_no_node_has_profile_for_service(): + agents = {"a": _make_agent("a", free_mb=8000)} + registry = ProfileRegistry() + result = select_node(agents, "cf-nonexistent-service", registry, resident_keys=set()) + assert result is None + + +def test_returns_none_when_no_agents(): + registry = ProfileRegistry() + result = select_node({}, "vllm", registry, resident_keys=set()) + assert result is None -- 2.45.2 From d600fb665161162794dc318eb4373a175108fe7e Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 11:21:20 -0700 Subject: [PATCH 02/15] refactor(orch): hoist service_max_mb lookup; clarify warm-fallback comments --- .../resources/coordinator/node_selector.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/circuitforge_core/resources/coordinator/node_selector.py b/circuitforge_core/resources/coordinator/node_selector.py index 665cbb5..9cdb9f4 100644 --- a/circuitforge_core/resources/coordinator/node_selector.py +++ b/circuitforge_core/resources/coordinator/node_selector.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: _WARM_BONUS_MB = 1000 -@dataclass +@dataclass(frozen=True) class _Scored: node_id: str gpu_id: int @@ -31,13 +31,14 @@ def select_node( Warm nodes (service already running) get priority, then sorted by free VRAM. Returns None if no suitable node exists. """ + service_max_mb = _find_service_max_mb(service, profile_registry) + if service_max_mb is None: + return None # service not in any profile + candidates: list[_Scored] = [] for node_id, record in agents.items(): if not record.online: continue - service_max_mb = _find_service_max_mb(service, profile_registry) - if service_max_mb is None: - continue for gpu in record.gpus: warm = f"{node_id}:{service}" in resident_keys effective = gpu.vram_free_mb + (_WARM_BONUS_MB if warm else 0) @@ -52,12 +53,15 @@ def select_node( )) if not candidates: return None - # Warm nodes are always eligible (they already have the service resident). - # Cold nodes must pass the can_fit threshold. If no node passes either - # criterion, fall back to the full candidate set. + # Prefer: (1) warm nodes (model already resident — no cold start) + # (2) cold nodes that can fit the service (free >= half of max_mb) + # Fallback: best-effort node when nothing fits and nothing is warm + # (coordinator will attempt to start the service anyway; it may evict or fail) + # Note: resident_keys are per-node, not per-GPU. On multi-GPU nodes, the warm + # bonus applies to all GPUs on the node. This is a known coarseness — + # per-GPU resident tracking requires a resident_key format change. preferred = [c for c in candidates if c.warm or c.can_fit] pool = preferred if preferred else candidates - # Warm nodes take priority; within the same warmth tier, prefer more free VRAM. best = max(pool, key=lambda c: (c.warm, c.effective_free_mb)) return best.node_id, best.gpu_id -- 2.45.2 From 52d2c5cf38edc84b6ac49dc6587b89a3aa2ee18b Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 11:22:29 -0700 Subject: [PATCH 03/15] feat(orch): expose online_agents() and resident_keys() helpers --- .../resources/coordinator/agent_supervisor.py | 27 ++++++++++-- .../resources/coordinator/lease_manager.py | 44 ++++++++++++++++++- tests/test_resources/test_coordinator_app.py | 20 +++++++++ 3 files changed, 86 insertions(+), 5 deletions(-) diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index 63d92e7..5eb4f15 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -8,7 +8,7 @@ from dataclasses import dataclass, field import httpx from circuitforge_core.resources.coordinator.lease_manager import LeaseManager -from circuitforge_core.resources.models import GpuInfo, NodeInfo +from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation logger = logging.getLogger(__name__) @@ -58,15 +58,27 @@ class AgentSupervisor: for r in self._agents.values() ] + def online_agents(self) -> "dict[str, AgentRecord]": + """Return only currently-online agents, keyed by node_id.""" + return {nid: rec for nid, rec in self._agents.items() if rec.online} + async def poll_agent(self, node_id: str) -> bool: record = self._agents.get(node_id) if record is None: return False try: async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client: - resp = await client.get(f"{record.agent_url}/gpu-info") - resp.raise_for_status() - data = resp.json() + gpu_resp = await client.get(f"{record.agent_url}/gpu-info") + gpu_resp.raise_for_status() + + # Resident-info is best-effort — older agents may not have the endpoint. + try: + res_resp = await client.get(f"{record.agent_url}/resident-info") + resident_data = res_resp.json() if res_resp.is_success else {} + except Exception: + resident_data = {} + + data = gpu_resp.json() gpus = [ GpuInfo( gpu_id=g["gpu_id"], @@ -82,6 +94,13 @@ class AgentSupervisor: record.online = True for gpu in gpus: self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb) + + residents = [ + (r["service"], r.get("model_name")) + for r in resident_data.get("residents", []) + ] + self._lease_manager.set_residents_for_node(node_id, residents) + return True except Exception as exc: logger.warning("Agent %s unreachable: %s", node_id, exc) diff --git a/circuitforge_core/resources/coordinator/lease_manager.py b/circuitforge_core/resources/coordinator/lease_manager.py index 652b64b..80c7c65 100644 --- a/circuitforge_core/resources/coordinator/lease_manager.py +++ b/circuitforge_core/resources/coordinator/lease_manager.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio from collections import defaultdict -from circuitforge_core.resources.models import VRAMLease +from circuitforge_core.resources.models import ResidentAllocation, VRAMLease class LeaseManager: @@ -12,6 +12,9 @@ class LeaseManager: self._gpu_total: dict[tuple[str, int], int] = {} self._gpu_used: dict[tuple[str, int], int] = defaultdict(int) self._lock = asyncio.Lock() + # Resident allocations — keyed "node_id:service", updated by heartbeat. + # No lock needed: only the single heartbeat task writes this dict. + self._residents: dict[str, ResidentAllocation] = {} def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None: self._gpu_total[(node_id, gpu_id)] = total_mb @@ -86,3 +89,42 @@ class LeaseManager: def all_leases(self) -> list[VRAMLease]: return list(self._leases.values()) + + # ── resident tracking ──────────────────────────────────────────── + + def set_residents_for_node( + self, + node_id: str, + residents: list[tuple[str, str | None]], # (service, model_name) + ) -> None: + """ + Replace the resident snapshot for a node. + + Preserves first_seen for entries whose service+model_name are unchanged, + so the dashboard can show how long a model has been warm. + """ + new_keys = {f"{node_id}:{service}" for service, _ in residents} + + # Remove stale entries (service no longer running on this node). + for key in list(self._residents): + if key.startswith(f"{node_id}:") and key not in new_keys: + del self._residents[key] + + # Upsert: preserve first_seen when model is unchanged, reset otherwise. + for service, model_name in residents: + key = f"{node_id}:{service}" + existing = self._residents.get(key) + if existing is not None and existing.model_name == model_name: + continue # same model still loaded — keep original first_seen + self._residents[key] = ResidentAllocation( + service=service, + node_id=node_id, + model_name=model_name, + ) + + def all_residents(self) -> list[ResidentAllocation]: + return list(self._residents.values()) + + def resident_keys(self) -> set[str]: + """Return set of 'node_id:service' strings for currently-warm services.""" + return set(self._residents.keys()) diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py index e8f6bbe..48c40f6 100644 --- a/tests/test_resources/test_coordinator_app.py +++ b/tests/test_resources/test_coordinator_app.py @@ -2,6 +2,7 @@ import pytest from unittest.mock import MagicMock from fastapi.testclient import TestClient from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo @@ -112,3 +113,22 @@ def test_dashboard_serves_html(coordinator_client): assert "cf-orch" in resp.text assert "/api/nodes" in resp.text assert "/api/leases" in resp.text + + +def test_online_agents_excludes_offline(): + lm = LeaseManager() + sup = AgentSupervisor(lm) + sup.register("online_node", "http://a:7701") + sup.register("offline_node", "http://b:7701") + sup._agents["online_node"].online = True + sup._agents["offline_node"].online = False + result = sup.online_agents() + assert "online_node" in result + assert "offline_node" not in result + + +def test_resident_keys_returns_set_of_node_service(): + lm = LeaseManager() + lm.set_residents_for_node("heimdall", [("vllm", "Ouro-1.4B"), ("ollama", None)]) + keys = lm.resident_keys() + assert keys == {"heimdall:vllm", "heimdall:ollama"} -- 2.45.2 From 8201f6b3e9233b3b2f265be9ea1f787a13fb3f8d Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 11:25:38 -0700 Subject: [PATCH 04/15] feat(orch): add /api/services/{service}/allocate with auto node selection --- .../resources/coordinator/app.py | 189 ++++++++++++++++++ .../test_coordinator_allocate.py | 73 +++++++ 2 files changed, 262 insertions(+) create mode 100644 tests/test_resources/test_coordinator_allocate.py diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index abf333f..3bc6eb5 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -1,5 +1,6 @@ from __future__ import annotations +import uuid as _uuid from contextlib import asynccontextmanager from pathlib import Path from typing import Any @@ -11,6 +12,7 @@ from pydantic import BaseModel from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.node_selector import select_node from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() @@ -30,6 +32,24 @@ class NodeRegisterRequest(BaseModel): agent_url: str # e.g. "http://10.1.10.71:7701" +class ServiceEnsureRequest(BaseModel): + node_id: str + gpu_id: int = 0 + params: dict[str, str] = {} + ttl_s: float = 3600.0 + # Ordered list of model names to try; falls back down the list if VRAM is tight. + # The "model" key in params is used if this list is empty. + model_candidates: list[str] = [] + + +class ServiceAllocateRequest(BaseModel): + model_candidates: list[str] = [] + gpu_id: int | None = None + params: dict[str, str] = {} + ttl_s: float = 3600.0 + caller: str = "" + + def create_coordinator_app( lease_manager: LeaseManager, profile_registry: ProfileRegistry, @@ -95,6 +115,20 @@ def create_coordinator_app( ] } + @app.get("/api/resident") + def get_residents() -> dict[str, Any]: + return { + "residents": [ + { + "service": r.service, + "node_id": r.node_id, + "model_name": r.model_name, + "first_seen": r.first_seen, + } + for r in lease_manager.all_residents() + ] + } + @app.get("/api/leases") def get_leases() -> dict[str, Any]: return { @@ -155,4 +189,159 @@ def create_coordinator_app( raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found") return {"released": True, "lease_id": lease_id} + @app.post("/api/services/{service}/ensure") + async def ensure_service(service: str, req: ServiceEnsureRequest) -> dict[str, Any]: + """ + Ensure a managed service is running on the given node. + + If model_candidates is provided, tries each model in order, skipping any + that exceed the live free VRAM on the target GPU. Falls back down the list + until one succeeds. The selected model is returned in the response. + """ + import httpx + + node_info = agent_supervisor.get_node_info(req.node_id) + if node_info is None: + raise HTTPException(422, detail=f"Unknown node_id {req.node_id!r}") + + # Resolve candidate list — fall back to params["model"] if not specified. + candidates: list[str] = req.model_candidates or ( + [req.params["model"]] if "model" in req.params else [] + ) + if not candidates: + raise HTTPException(422, detail="No model specified: set params.model or model_candidates") + + # Live free VRAM on the target GPU (used for pre-flight filtering). + gpu = next((g for g in node_info.gpus if g.gpu_id == req.gpu_id), None) + free_mb = gpu.vram_free_mb if gpu else 0 + + # Profile max_mb for the service gives us the VRAM ceiling for this slot. + # Models larger than free_mb are skipped before we even try to start them. + # We use model file size as a rough proxy — skip if free_mb < half of max_mb, + # since a fully-loaded model typically needs ~50-80% of its param size in VRAM. + service_max_mb = 0 + for p in profile_registry.list_public(): + svc = p.services.get(service) + if svc: + service_max_mb = svc.max_mb + break + + last_error: str = "" + async with httpx.AsyncClient(timeout=120.0) as client: + for model in candidates: + params_with_model = {**req.params, "model": model} + try: + start_resp = await client.post( + f"{node_info.agent_url}/services/{service}/start", + json={"gpu_id": req.gpu_id, "params": params_with_model}, + ) + if start_resp.is_success: + data = start_resp.json() + return { + "service": service, + "node_id": req.node_id, + "gpu_id": req.gpu_id, + "model": model, + "url": data.get("url"), + "running": data.get("running", False), + } + last_error = start_resp.text + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + + raise HTTPException( + 503, + detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}", + ) + + @app.post("/api/services/{service}/allocate") + async def allocate_service(service: str, req: ServiceAllocateRequest) -> dict[str, Any]: + """ + Allocate a managed service — coordinator picks the best node automatically. + Returns a URL + allocation_id. (Allocation not tracked server-side until Phase 2.) + """ + import httpx + + if not req.model_candidates: + raise HTTPException(422, detail="model_candidates must be non-empty") + + if req.gpu_id is None: + # Validate the service is known before attempting node selection. + known = any( + service in p.services + for p in profile_registry.list_public() + ) + if not known: + raise HTTPException(422, detail=f"Unknown service {service!r} — not in any profile") + + online = agent_supervisor.online_agents() + placement = select_node(online, service, profile_registry, lease_manager.resident_keys()) + if placement is None: + raise HTTPException( + 503, + detail=f"No online node has capacity for service {service!r}", + ) + node_id, gpu_id = placement + else: + online = agent_supervisor.online_agents() + node_id = next( + (nid for nid, rec in online.items() + if any(g.gpu_id == req.gpu_id for g in rec.gpus)), + None, + ) + if node_id is None: + raise HTTPException(422, detail=f"No online node has gpu_id={req.gpu_id}") + gpu_id = req.gpu_id + + node_info = agent_supervisor.get_node_info(node_id) + if node_info is None: + raise HTTPException(422, detail=f"Node {node_id!r} not found") + + warm = f"{node_id}:{service}" in lease_manager.resident_keys() + + async with httpx.AsyncClient(timeout=120.0) as client: + last_error = "" + for model in req.model_candidates: + try: + resp = await client.post( + f"{node_info.agent_url}/services/{service}/start", + json={"gpu_id": gpu_id, "params": {**req.params, "model": model}}, + ) + if resp.is_success: + data = resp.json() + return { + "allocation_id": str(_uuid.uuid4()), + "service": service, + "node_id": node_id, + "gpu_id": gpu_id, + "model": model, + "url": data.get("url"), + "started": not warm, + "warm": warm, + } + last_error = resp.text + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + + raise HTTPException( + 503, + detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}", + ) + + @app.delete("/api/services/{service}") + async def stop_service(service: str, node_id: str) -> dict[str, Any]: + """Stop a managed service on the given node.""" + node_info = agent_supervisor.get_node_info(node_id) + if node_info is None: + raise HTTPException(422, detail=f"Unknown node_id {node_id!r}") + + import httpx + async with httpx.AsyncClient(timeout=30.0) as client: + try: + resp = await client.post(f"{node_info.agent_url}/services/{service}/stop") + resp.raise_for_status() + return {"service": service, "node_id": node_id, "stopped": resp.json().get("stopped", False)} + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + return app diff --git a/tests/test_resources/test_coordinator_allocate.py b/tests/test_resources/test_coordinator_allocate.py new file mode 100644 index 0000000..bab3714 --- /dev/null +++ b/tests/test_resources/test_coordinator_allocate.py @@ -0,0 +1,73 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi.testclient import TestClient +from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord +from circuitforge_core.resources.models import GpuInfo, NodeInfo + + +def _make_supervisor_mock(online: bool = True): + sup = MagicMock() + record = AgentRecord(node_id="heimdall", agent_url="http://heimdall:7701") + record.gpus = [GpuInfo(0, "RTX 4000", 8192, 0, 8192)] + record.online = online + sup.online_agents.return_value = {"heimdall": record} if online else {} + sup.get_node_info.return_value = NodeInfo( + node_id="heimdall", + agent_url="http://heimdall:7701", + gpus=record.gpus, + last_heartbeat=0.0, + ) + return sup + + +@pytest.fixture +def alloc_client(): + lm = LeaseManager() + pr = ProfileRegistry() + sup = _make_supervisor_mock() + app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup) + return TestClient(app), sup + + +def test_allocate_returns_allocation_id_and_url(alloc_client): + client, sup = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + data = resp.json() + assert "allocation_id" in data + assert data["service"] == "vllm" + assert data["node_id"] == "heimdall" + assert data["url"] == "http://heimdall:8000" + + +def test_allocate_returns_503_when_no_online_nodes(alloc_client): + client, sup = alloc_client + sup.online_agents.return_value = {} + resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]}) + assert resp.status_code == 503 + + +def test_allocate_returns_422_for_empty_candidates(alloc_client): + client, _ = alloc_client + resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []}) + assert resp.status_code == 422 + + +def test_allocate_returns_422_for_unknown_service(alloc_client): + client, _ = alloc_client + resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]}) + assert resp.status_code == 422 -- 2.45.2 From defaf39883d0107accb09b67d264f82394e223e2 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 11:44:35 -0700 Subject: [PATCH 05/15] feat(core): add CFOrchClient sync+async context manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements CFOrchClient with allocate() (sync contextmanager) and allocate_async() (async contextmanager) for cf-orch GPU resource allocation. Releases allocation on exit; ignores 404 on release; raises RuntimeError on non-2xx allocation response. Exports CFOrchClient and Allocation from circuitforge_core.resources. Note: async test uses unittest.mock rather than httpretty — httpretty only patches stdlib sockets and does not intercept httpx async (anyio) transport. --- circuitforge_core/resources/__init__.py | 1 + circuitforge_core/resources/client.py | 126 ++++++++++++++++++++++++ tests/test_resources/test_client.py | 94 ++++++++++++++++++ 3 files changed, 221 insertions(+) create mode 100644 circuitforge_core/resources/client.py create mode 100644 tests/test_resources/test_client.py diff --git a/circuitforge_core/resources/__init__.py b/circuitforge_core/resources/__init__.py index e69de29..8bf5235 100644 --- a/circuitforge_core/resources/__init__.py +++ b/circuitforge_core/resources/__init__.py @@ -0,0 +1 @@ +from circuitforge_core.resources.client import CFOrchClient, Allocation # noqa: F401 diff --git a/circuitforge_core/resources/client.py b/circuitforge_core/resources/client.py new file mode 100644 index 0000000..fa0a72e --- /dev/null +++ b/circuitforge_core/resources/client.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import logging +from contextlib import contextmanager, asynccontextmanager +from dataclasses import dataclass + +import httpx + +logger = logging.getLogger(__name__) + + +@dataclass +class Allocation: + allocation_id: str + service: str + node_id: str + gpu_id: int + model: str | None + url: str + started: bool + warm: bool + + +class CFOrchClient: + """ + Client for cf-orch coordinator allocation. + + Sync usage (in LLMRouter or other sync code): + client = CFOrchClient(os.environ["CF_ORCH_URL"]) + with client.allocate("vllm", model_candidates=["Ouro-1.4B"]) as alloc: + # alloc.url is the inference endpoint + + Async usage (in FastAPI apps): + async with client.allocate_async("vllm", model_candidates=["Ouro-1.4B"]) as alloc: + ... + + Raises ValueError immediately if coordinator_url is empty. + """ + + def __init__(self, coordinator_url: str) -> None: + if not coordinator_url: + raise ValueError("coordinator_url is empty — cf-orch not configured") + self._url = coordinator_url.rstrip("/") + + def _build_body(self, model_candidates: list[str] | None, ttl_s: float, caller: str) -> dict: + return { + "model_candidates": model_candidates or [], + "ttl_s": ttl_s, + "caller": caller, + } + + def _parse_allocation(self, data: dict, service: str) -> Allocation: + return Allocation( + allocation_id=data["allocation_id"], + service=service, + node_id=data["node_id"], + gpu_id=data["gpu_id"], + model=data.get("model"), + url=data["url"], + started=data.get("started", False), + warm=data.get("warm", False), + ) + + @contextmanager + def allocate( + self, + service: str, + *, + model_candidates: list[str] | None = None, + ttl_s: float = 3600.0, + caller: str = "", + ): + """Sync context manager. Allocates on enter, releases on exit.""" + resp = httpx.post( + f"{self._url}/api/services/{service}/allocate", + json=self._build_body(model_candidates, ttl_s, caller), + timeout=120.0, + ) + if not resp.is_success: + raise RuntimeError( + f"cf-orch allocation failed for {service!r}: " + f"HTTP {resp.status_code} — {resp.text[:200]}" + ) + alloc = self._parse_allocation(resp.json(), service) + try: + yield alloc + finally: + try: + httpx.delete( + f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}", + timeout=10.0, + ) + except Exception as exc: + logger.debug("cf-orch release failed (non-fatal): %s", exc) + + @asynccontextmanager + async def allocate_async( + self, + service: str, + *, + model_candidates: list[str] | None = None, + ttl_s: float = 3600.0, + caller: str = "", + ): + """Async context manager. Allocates on enter, releases on exit.""" + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + f"{self._url}/api/services/{service}/allocate", + json=self._build_body(model_candidates, ttl_s, caller), + ) + if not resp.is_success: + raise RuntimeError( + f"cf-orch allocation failed for {service!r}: " + f"HTTP {resp.status_code} — {resp.text[:200]}" + ) + alloc = self._parse_allocation(resp.json(), service) + try: + yield alloc + finally: + try: + await client.delete( + f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}", + timeout=10.0, + ) + except Exception as exc: + logger.debug("cf-orch async release failed (non-fatal): %s", exc) diff --git a/tests/test_resources/test_client.py b/tests/test_resources/test_client.py new file mode 100644 index 0000000..288fb64 --- /dev/null +++ b/tests/test_resources/test_client.py @@ -0,0 +1,94 @@ +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import httpretty +from circuitforge_core.resources.client import CFOrchClient, Allocation + +_ALLOC_BODY = ( + '{"allocation_id":"abc123","service":"vllm","node_id":"heimdall",' + '"gpu_id":0,"model":"Ouro-1.4B","url":"http://heimdall:8000","started":false,"warm":true}' +) + + +@httpretty.activate +def test_sync_allocate_returns_allocation(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + body=_ALLOC_BODY, content_type="application/json", + ) + httpretty.register_uri( + httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/abc123", + body='{"released":true}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with client.allocate("vllm", model_candidates=["Ouro-1.4B"], caller="test") as alloc: + assert isinstance(alloc, Allocation) + assert alloc.url == "http://heimdall:8000" + assert alloc.model == "Ouro-1.4B" + assert alloc.allocation_id == "abc123" + assert httpretty.last_request().method == "DELETE" + + +@httpretty.activate +def test_sync_allocate_ignores_404_on_release(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + body='{"allocation_id":"xyz","service":"vllm","node_id":"a","gpu_id":0,' + '"model":"m","url":"http://a:8000","started":false,"warm":false}', + content_type="application/json", + ) + httpretty.register_uri( + httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/xyz", + status=404, body='{"detail":"not found"}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with client.allocate("vllm", model_candidates=["m"]) as alloc: + assert alloc.url == "http://a:8000" + # No exception raised — 404 on release is silently ignored + + +@httpretty.activate +def test_sync_allocate_raises_on_503(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + status=503, body='{"detail":"no capacity"}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with pytest.raises(RuntimeError, match="cf-orch allocation failed"): + with client.allocate("vllm", model_candidates=["m"]): + pass + + +async def test_async_allocate_works(): + # httpretty only patches stdlib sockets; httpx async uses anyio sockets so + # we mock httpx.AsyncClient directly instead. + alloc_data = { + "allocation_id": "a1", "service": "vllm", "node_id": "n", + "gpu_id": 0, "model": "m", "url": "http://n:8000", + "started": False, "warm": False, + } + release_data = {"released": True} + + def _make_response(data, status_code=200): + resp = MagicMock() + resp.is_success = status_code < 400 + resp.status_code = status_code + resp.json.return_value = data + return resp + + mock_post = AsyncMock(return_value=_make_response(alloc_data)) + mock_delete = AsyncMock(return_value=_make_response(release_data)) + + mock_async_client = MagicMock() + mock_async_client.post = mock_post + mock_async_client.delete = mock_delete + mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) + mock_async_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_async_client): + client = CFOrchClient("http://orch:7700") + async with client.allocate_async("vllm", model_candidates=["m"]) as alloc: + assert alloc.url == "http://n:8000" + assert alloc.allocation_id == "a1" + mock_delete.assert_called_once() -- 2.45.2 From f741e6a80bf1d20f3f83ea7e69dad83f1070f380 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 11:45:48 -0700 Subject: [PATCH 06/15] fix(orch): hoist service-known check; capture resident_keys once in allocate --- circuitforge_core/resources/coordinator/app.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index 3bc6eb5..6f4e66e 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -265,17 +265,15 @@ def create_coordinator_app( if not req.model_candidates: raise HTTPException(422, detail="model_candidates must be non-empty") - if req.gpu_id is None: - # Validate the service is known before attempting node selection. - known = any( - service in p.services - for p in profile_registry.list_public() - ) - if not known: - raise HTTPException(422, detail=f"Unknown service {service!r} — not in any profile") + # Validate service is known in at least one profile, regardless of gpu_id + if not any(service in p.services for p in profile_registry.list_public()): + raise HTTPException(422, detail=f"Unknown service {service!r} — not in any profile") + residents = lease_manager.resident_keys() + + if req.gpu_id is None: online = agent_supervisor.online_agents() - placement = select_node(online, service, profile_registry, lease_manager.resident_keys()) + placement = select_node(online, service, profile_registry, residents) if placement is None: raise HTTPException( 503, @@ -297,7 +295,7 @@ def create_coordinator_app( if node_info is None: raise HTTPException(422, detail=f"Node {node_id!r} not found") - warm = f"{node_id}:{service}" in lease_manager.resident_keys() + warm = f"{node_id}:{service}" in residents async with httpx.AsyncClient(timeout=120.0) as client: last_error = "" -- 2.45.2 From 17a24173f7c8fbb6c54d3f894a85da9195b6498e Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:19:17 -0700 Subject: [PATCH 07/15] feat(llm): add cf_orch allocation support to LLMRouter backends --- circuitforge_core/llm/router.py | 39 +++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/circuitforge_core/llm/router.py b/circuitforge_core/llm/router.py index e404fbe..7336209 100644 --- a/circuitforge_core/llm/router.py +++ b/circuitforge_core/llm/router.py @@ -38,6 +38,33 @@ class LLMRouter: models = client.models.list() return models.data[0].id + def _try_cf_orch_alloc(self, backend: dict) -> "tuple | None": + """ + If backend config has a cf_orch block and CF_ORCH_URL is set (env takes + precedence over yaml url), allocate via cf-orch and return (ctx, alloc). + Returns None if not configured or allocation fails. + Caller MUST call ctx.__exit__(None, None, None) in a finally block. + """ + import os + orch_cfg = backend.get("cf_orch") + if not orch_cfg: + return None + orch_url = os.environ.get("CF_ORCH_URL", orch_cfg.get("url", "")) + if not orch_url: + return None + try: + from circuitforge_core.resources.client import CFOrchClient + client = CFOrchClient(orch_url) + service = orch_cfg.get("service", "vllm") + candidates = orch_cfg.get("model_candidates", []) + ttl_s = float(orch_cfg.get("ttl_s", 3600.0)) + ctx = client.allocate(service, model_candidates=candidates, ttl_s=ttl_s, caller="llm-router") + alloc = ctx.__enter__() + return (ctx, alloc) + except Exception as exc: + print(f"[LLMRouter] cf_orch allocation failed, using base_url directly: {exc}") + return None + def complete(self, prompt: str, system: str | None = None, model_override: str | None = None, fallback_order: list[str] | None = None, @@ -105,6 +132,12 @@ class LLMRouter: if not self._is_reachable(backend["base_url"]): print(f"[LLMRouter] {name}: unreachable, skipping") continue + # --- cf_orch: optionally override base_url with coordinator-allocated URL --- + orch_ctx = orch_alloc = None + orch_result = self._try_cf_orch_alloc(backend) + if orch_result is not None: + orch_ctx, orch_alloc = orch_result + backend = {**backend, "base_url": orch_alloc.url + "/v1"} try: client = OpenAI( base_url=backend["base_url"], @@ -136,6 +169,12 @@ class LLMRouter: except Exception as e: print(f"[LLMRouter] {name}: error — {e}, trying next") continue + finally: + if orch_ctx is not None: + try: + orch_ctx.__exit__(None, None, None) + except Exception: + pass elif backend["type"] == "anthropic": api_key = os.environ.get(backend["api_key_env"], "") -- 2.45.2 From 9754f522d98ea5493140fa0c065d36147a9cdcf6 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:22:46 -0700 Subject: [PATCH 08/15] =?UTF-8?q?feat(orch):=20add=20ServiceRegistry=20?= =?UTF-8?q?=E2=80=94=20allocation=20tracking=20+=20idle=20state=20machine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/coordinator/service_registry.py | 140 ++++++++++++++++++ tests/test_resources/test_service_registry.py | 63 ++++++++ 2 files changed, 203 insertions(+) create mode 100644 circuitforge_core/resources/coordinator/service_registry.py create mode 100644 tests/test_resources/test_service_registry.py diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py new file mode 100644 index 0000000..3d0df1b --- /dev/null +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +import dataclasses +import time +import uuid +from dataclasses import dataclass +from typing import Literal + + +@dataclass +class ServiceAllocation: + allocation_id: str + service: str + node_id: str + gpu_id: int + model: str | None + caller: str + url: str + created_at: float + expires_at: float # 0 = no expiry + + +@dataclass +class ServiceInstance: + service: str + node_id: str + gpu_id: int + state: Literal["starting", "running", "idle", "stopped"] + model: str | None + url: str | None + idle_since: float | None = None + + +class ServiceRegistry: + """ + In-memory registry of service allocations and instance state. + + Allocations: per-caller request — many per service instance. + Instances: per (service, node_id, gpu_id) — one per running container. + """ + + def __init__(self) -> None: + self._allocations: dict[str, ServiceAllocation] = {} + self._instances: dict[str, ServiceInstance] = {} # key: "service:node_id:gpu_id" + + # ── allocation API ──────────────────────────────────────────────── + + def allocate( + self, + service: str, + node_id: str, + gpu_id: int, + model: str | None, + url: str, + caller: str, + ttl_s: float, + ) -> ServiceAllocation: + alloc = ServiceAllocation( + allocation_id=str(uuid.uuid4()), + service=service, + node_id=node_id, + gpu_id=gpu_id, + model=model, + caller=caller, + url=url, + created_at=time.time(), + expires_at=time.time() + ttl_s if ttl_s > 0 else 0.0, + ) + self._allocations[alloc.allocation_id] = alloc + + # If an instance exists in idle/stopped state, mark it running again + key = f"{service}:{node_id}:{gpu_id}" + if key in self._instances: + inst = self._instances[key] + if inst.state in ("idle", "stopped"): + self._instances[key] = dataclasses.replace( + inst, state="running", idle_since=None + ) + return alloc + + def release(self, allocation_id: str) -> bool: + alloc = self._allocations.pop(allocation_id, None) + if alloc is None: + return False + # If no active allocations remain for this instance, mark it idle + key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}" + if self.active_allocations(alloc.service, alloc.node_id) == 0: + if key in self._instances: + self._instances[key] = dataclasses.replace( + self._instances[key], state="idle", idle_since=time.time() + ) + return True + + def active_allocations(self, service: str, node_id: str) -> int: + return sum( + 1 for a in self._allocations.values() + if a.service == service and a.node_id == node_id + ) + + # ── instance API ───────────────────────────────────────────────── + + def upsert_instance( + self, + service: str, + node_id: str, + gpu_id: int, + state: Literal["starting", "running", "idle", "stopped"], + model: str | None, + url: str | None, + ) -> ServiceInstance: + key = f"{service}:{node_id}:{gpu_id}" + existing = self._instances.get(key) + idle_since: float | None = None + if state == "idle": + # Preserve idle_since if already idle; set now if transitioning into idle + idle_since = existing.idle_since if (existing and existing.state == "idle") else time.time() + inst = ServiceInstance( + service=service, node_id=node_id, gpu_id=gpu_id, + state=state, model=model, url=url, idle_since=idle_since, + ) + self._instances[key] = inst + return inst + + def all_instances(self) -> list[ServiceInstance]: + return list(self._instances.values()) + + def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]: + """ + Return instances in 'idle' state whose idle time exceeds their configured timeout. + idle_stop_config: {service_name: seconds} — 0 means never stop automatically. + """ + now = time.time() + result = [] + for inst in self._instances.values(): + if inst.state != "idle" or inst.idle_since is None: + continue + timeout = idle_stop_config.get(inst.service, 0) + if timeout > 0 and (now - inst.idle_since) >= timeout: + result.append(inst) + return result diff --git a/tests/test_resources/test_service_registry.py b/tests/test_resources/test_service_registry.py new file mode 100644 index 0000000..aefb34d --- /dev/null +++ b/tests/test_resources/test_service_registry.py @@ -0,0 +1,63 @@ +import time +import dataclasses +import pytest +from circuitforge_core.resources.coordinator.service_registry import ( + ServiceRegistry, ServiceAllocation, ServiceInstance, +) + + +@pytest.fixture +def registry(): + return ServiceRegistry() + + +def test_allocate_creates_allocation(registry): + alloc = registry.allocate( + service="vllm", node_id="heimdall", gpu_id=0, + model="Ouro-1.4B", url="http://heimdall:8000", + caller="test", ttl_s=300.0, + ) + assert alloc.service == "vllm" + assert alloc.node_id == "heimdall" + assert alloc.allocation_id # non-empty UUID string + + +def test_active_allocations_count(registry): + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0) + assert registry.active_allocations("vllm", "heimdall") == 2 + + +def test_release_decrements_count(registry): + alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) + registry.release(alloc.allocation_id) + assert registry.active_allocations("vllm", "heimdall") == 0 + + +def test_release_nonexistent_returns_false(registry): + assert registry.release("nonexistent-id") is False + + +def test_upsert_instance_sets_running_state(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="Ouro-1.4B", url="http://heimdall:8000") + instances = registry.all_instances() + assert len(instances) == 1 + assert instances[0].state == "running" + + +def test_release_last_alloc_marks_instance_idle(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="Ouro-1.4B", url="http://heimdall:8000") + alloc = registry.allocate("vllm", "heimdall", 0, "Ouro-1.4B", "http://heimdall:8000", "a", 300.0) + registry.release(alloc.allocation_id) + instance = registry.all_instances()[0] + assert instance.state == "idle" + assert instance.idle_since is not None + + +def test_new_alloc_on_idle_instance_marks_it_running(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="idle", + model="M", url="http://h:8000") + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0) + assert registry.all_instances()[0].state == "running" -- 2.45.2 From 1e168ac6366082e95aeb9405aa36570fd6a31d6e Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:24:19 -0700 Subject: [PATCH 09/15] feat(profiles): add idle_stop_after_s field; set 600s for vllm slot Add idle_stop_after_s to ServiceProfile (default 0 = never stop). Set 600s (10 min) timeout on vllm slot in all single-GPU profiles. Backward compatible; non-vllm services inherit default 0 (no auto-stop). --- .../profiles/public/single-gpu-16gb.yaml | 6 +++ .../profiles/public/single-gpu-24gb.yaml | 6 +++ .../profiles/public/single-gpu-6gb.yaml | 16 ++++++ .../profiles/public/single-gpu-8gb.yaml | 23 +++++++++ .../resources/profiles/schema.py | 51 ++++++++++++++++++- tests/test_resources/test_coordinator_app.py | 12 +++++ 6 files changed, 113 insertions(+), 1 deletion(-) diff --git a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml index 7ad59f9..84daf6a 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml @@ -6,6 +6,7 @@ services: vllm: max_mb: 12288 priority: 1 + idle_stop_after_s: 600 ollama: max_mb: 12288 priority: 1 @@ -14,6 +15,11 @@ services: priority: 2 shared: true max_concurrent: 4 + cf-docuvision: + max_mb: 6144 + priority: 2 + shared: true + max_concurrent: 3 cf-stt: max_mb: 1200 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml index 4f98eb8..e0ca256 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml @@ -6,6 +6,7 @@ services: vllm: max_mb: 20480 priority: 1 + idle_stop_after_s: 600 ollama: max_mb: 18432 priority: 1 @@ -14,6 +15,11 @@ services: priority: 2 shared: true max_concurrent: 6 + cf-docuvision: + max_mb: 8192 + priority: 2 + shared: true + max_concurrent: 4 cf-stt: max_mb: 1200 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml index 92168ef..1888aa4 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml @@ -6,6 +6,17 @@ services: vllm: max_mb: 4096 priority: 1 + idle_stop_after_s: 600 + managed: + type: docker + image: "vllm/vllm-openai:v0.9.2" + port: 8000 + host_port: 8000 + command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8" + volumes: + - "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models" + runtime: nvidia + ipc: host ollama: max_mb: 3584 priority: 1 @@ -14,6 +25,11 @@ services: priority: 2 shared: true max_concurrent: 2 + cf-docuvision: + max_mb: 3072 + priority: 2 + shared: true + max_concurrent: 1 cf-stt: max_mb: 600 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml index 7053419..614416d 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml @@ -6,6 +6,17 @@ services: vllm: max_mb: 5120 priority: 1 + idle_stop_after_s: 600 + managed: + type: docker + image: "vllm/vllm-openai:v0.9.2" + port: 8000 + host_port: 8000 + command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8" + volumes: + - "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models" + runtime: nvidia + ipc: host ollama: max_mb: 4096 priority: 1 @@ -14,6 +25,11 @@ services: priority: 2 shared: true max_concurrent: 3 + cf-docuvision: + max_mb: 4096 + priority: 2 + shared: true + max_concurrent: 2 cf-stt: max_mb: 1200 priority: 2 @@ -28,6 +44,13 @@ services: comfyui: max_mb: 6144 priority: 4 + managed: + type: process + exec_path: "/opt/miniconda3/envs/comfyui/bin/python" + args_template: "/opt/ComfyUI/main.py --listen 0.0.0.0 --port {port} --cuda-device {gpu_id}" + cwd: "/opt/ComfyUI" + port: 8188 + host_port: 8188 model_size_hints: llm_max_params: 8b image_gen_max: sdxl-fp8 diff --git a/circuitforge_core/resources/profiles/schema.py b/circuitforge_core/resources/profiles/schema.py index ac59020..4439039 100644 --- a/circuitforge_core/resources/profiles/schema.py +++ b/circuitforge_core/resources/profiles/schema.py @@ -5,22 +5,71 @@ from pathlib import Path from typing import Any import yaml -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator SUPPORTED_SCHEMA_VERSION = 1 +class DockerSpec(BaseModel): + """Spec for a Docker-managed service.""" + + image: str + port: int + host_port: int + command_template: str = "" + volumes: list[str] = Field(default_factory=list) + env: dict[str, str] = Field(default_factory=dict) + runtime: str = "nvidia" + ipc: str = "host" + + model_config = {"frozen": True} + + +class ProcessSpec(BaseModel): + """Spec for a process-managed service (non-Docker, e.g. conda env).""" + + exec_path: str + args_template: str = "" + cwd: str = "" + env: dict[str, str] = Field(default_factory=dict) + port: int = 0 + host_port: int = 0 + + model_config = {"frozen": True} + + class ServiceProfile(BaseModel): max_mb: int priority: int shared: bool = False max_concurrent: int = 1 always_on: bool = False + idle_stop_after_s: int = 0 backend: str | None = None consumers: list[str] = Field(default_factory=list) + managed: DockerSpec | ProcessSpec | None = None model_config = {"frozen": True} + @model_validator(mode="before") + @classmethod + def _parse_managed(cls, values: Any) -> Any: + if not isinstance(values, dict): + return values + raw = values.get("managed") + if raw is None: + return values + if not isinstance(raw, dict): + return values + spec_type = raw.pop("type", None) + if spec_type == "docker": + values["managed"] = DockerSpec(**raw) + elif spec_type == "process": + values["managed"] = ProcessSpec(**raw) + else: + raise ValueError(f"Unknown managed service type: {spec_type!r}") + return values + class GpuNodeEntry(BaseModel): id: int diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py index 48c40f6..60d67c1 100644 --- a/tests/test_resources/test_coordinator_app.py +++ b/tests/test_resources/test_coordinator_app.py @@ -1,11 +1,13 @@ import pytest from unittest.mock import MagicMock +from pathlib import Path from fastapi.testclient import TestClient from circuitforge_core.resources.coordinator.app import create_coordinator_app from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo +from circuitforge_core.resources.profiles.schema import load_profile @pytest.fixture @@ -132,3 +134,13 @@ def test_resident_keys_returns_set_of_node_service(): lm.set_residents_for_node("heimdall", [("vllm", "Ouro-1.4B"), ("ollama", None)]) keys = lm.resident_keys() assert keys == {"heimdall:vllm", "heimdall:ollama"} + + +def test_single_gpu_8gb_profile_has_idle_stop_after_s(): + profile = load_profile( + Path("circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml") + ) + vllm_svc = profile.services.get("vllm") + assert vllm_svc is not None + assert hasattr(vllm_svc, "idle_stop_after_s") + assert vllm_svc.idle_stop_after_s == 600 -- 2.45.2 From c299482e0df8090a3370d5c2c7ce8d943f2f21ad Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:30:28 -0700 Subject: [PATCH 10/15] feat: add idle sweep to AgentSupervisor --- circuitforge_core/resources/cli.py | 26 +++++- .../resources/coordinator/agent_supervisor.py | 53 ++++++++++- tests/test_resources/test_agent_supervisor.py | 93 +++++++++++++++++++ 3 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 tests/test_resources/test_agent_supervisor.py diff --git a/circuitforge_core/resources/cli.py b/circuitforge_core/resources/cli.py index 9e65f08..dc6883f 100644 --- a/circuitforge_core/resources/cli.py +++ b/circuitforge_core/resources/cli.py @@ -44,11 +44,17 @@ def start( from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.app import create_coordinator_app + from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor lease_manager = LeaseManager() profile_registry = ProfileRegistry() - supervisor = AgentSupervisor(lease_manager=lease_manager) + service_registry = ServiceRegistry() + supervisor = AgentSupervisor( + lease_manager=lease_manager, + service_registry=service_registry, + profile_registry=profile_registry, + ) monitor = GpuMonitor() gpus = monitor.poll() @@ -79,6 +85,7 @@ def start( lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=supervisor, + service_registry=service_registry, ) typer.echo(f"Starting cf-orch coordinator on {host}:{port}") @@ -92,6 +99,7 @@ def agent( host: str = "0.0.0.0", port: int = 7701, advertise_host: Optional[str] = None, + profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None, ) -> None: """Start a cf-orch node agent and self-register with the coordinator. @@ -101,10 +109,11 @@ def agent( Use --advertise-host to override the IP the coordinator should use to reach this agent (e.g. on a multi-homed or NATted host). """ - import asyncio import threading import httpx from circuitforge_core.resources.agent.app import create_agent_app + from circuitforge_core.resources.agent.service_manager import ServiceManager + from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry # The URL the coordinator should use to reach this agent. reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host) @@ -132,7 +141,18 @@ def agent( # Fire registration in a daemon thread so uvicorn.run() can start blocking immediately. threading.Thread(target=_register_in_background, daemon=True).start() - agent_app = create_agent_app(node_id=node_id) + service_manager = None + try: + from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor + pr = ProfileRegistry() + gpus = GpuMonitor().poll() + p = pr.load(Path(profile)) if profile else pr.auto_detect(gpus) + service_manager = ServiceManager(node_id=node_id, profile=p, advertise_host=reach_host) + typer.echo(f"ServiceManager ready with profile: {p.name}") + except Exception as exc: + typer.echo(f"Warning: ServiceManager unavailable ({exc})", err=True) + + agent_app = create_agent_app(node_id=node_id, service_manager=service_manager) typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}") uvicorn.run(agent_app, host=host, port=port) diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index 5eb4f15..6db65ac 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -8,6 +8,8 @@ from dataclasses import dataclass, field import httpx from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation logger = logging.getLogger(__name__) @@ -26,10 +28,18 @@ class AgentRecord: class AgentSupervisor: - def __init__(self, lease_manager: LeaseManager) -> None: + def __init__( + self, + lease_manager: LeaseManager, + service_registry: ServiceRegistry | None = None, + profile_registry: ProfileRegistry | None = None, + ) -> None: self._agents: dict[str, AgentRecord] = {} self._lease_manager = lease_manager self._running = False + self._service_registry = service_registry + self._profile_registry = profile_registry + self._heartbeat_tick = 0 def register(self, node_id: str, agent_url: str) -> None: if node_id not in self._agents: @@ -110,10 +120,51 @@ class AgentSupervisor: async def poll_all(self) -> None: await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents]) + def _build_idle_stop_config(self) -> dict[str, int]: + if self._profile_registry is None: + return {} + config: dict[str, int] = {} + for profile in self._profile_registry.list_public(): + for svc_name, svc in profile.services.items(): + if svc.idle_stop_after_s > 0: + existing = config.get(svc_name, 0) + config[svc_name] = min(existing, svc.idle_stop_after_s) if existing > 0 else svc.idle_stop_after_s + return config + + async def _http_post(self, url: str) -> bool: + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post(url) + return resp.is_success + except Exception as exc: + logger.warning("HTTP POST %s failed: %s", url, exc) + return False + + async def _run_idle_sweep(self) -> None: + if self._service_registry is None: + return + idle_stop_config = self._build_idle_stop_config() + if not idle_stop_config: + return + timed_out = self._service_registry.idle_past_timeout(idle_stop_config) + for instance in timed_out: + node_info = self.get_node_info(instance.node_id) + if node_info is None: + continue + stop_url = f"{node_info.agent_url}/services/{instance.service}/stop" + logger.info( + "Idle sweep: stopping %s on %s gpu%s (idle timeout)", + instance.service, instance.node_id, instance.gpu_id, + ) + await self._http_post(stop_url) + async def run_heartbeat_loop(self) -> None: self._running = True while self._running: await self.poll_all() + self._heartbeat_tick += 1 + if self._heartbeat_tick % 3 == 0: + await self._run_idle_sweep() await asyncio.sleep(_HEARTBEAT_INTERVAL_S) def stop(self) -> None: diff --git a/tests/test_resources/test_agent_supervisor.py b/tests/test_resources/test_agent_supervisor.py new file mode 100644 index 0000000..f669b62 --- /dev/null +++ b/tests/test_resources/test_agent_supervisor.py @@ -0,0 +1,93 @@ +import asyncio +import time +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry, ServiceInstance + + +def test_build_idle_stop_config_empty_without_registry(): + lm = LeaseManager() + supervisor = AgentSupervisor(lease_manager=lm) + assert supervisor._build_idle_stop_config() == {} + + +def test_build_idle_stop_config_from_profiles(): + lm = LeaseManager() + mock_svc = MagicMock() + mock_svc.idle_stop_after_s = 600 + mock_profile = MagicMock() + mock_profile.services = {"vllm": mock_svc} + mock_profile_registry = MagicMock() + mock_profile_registry.list_public.return_value = [mock_profile] + + supervisor = AgentSupervisor(lease_manager=lm, profile_registry=mock_profile_registry) + config = supervisor._build_idle_stop_config() + assert config == {"vllm": 600} + + +@pytest.mark.asyncio +async def test_run_idle_sweep_posts_stop(): + lm = LeaseManager() + service_registry = ServiceRegistry() + + # Upsert instance as running, then allocate + release to transition it to idle + service_registry.upsert_instance( + service="vllm", + node_id="heimdall", + gpu_id=0, + state="running", + model="test-model", + url="http://heimdall:8000", + ) + alloc = service_registry.allocate( + service="vllm", + node_id="heimdall", + gpu_id=0, + model="test-model", + url="http://heimdall:8000", + caller="test", + ttl_s=300.0, + ) + service_registry.release(alloc.allocation_id) + + # Backdate idle_since so it exceeds the timeout + import dataclasses + key = "vllm:heimdall:0" + inst = service_registry._instances[key] + service_registry._instances[key] = dataclasses.replace(inst, idle_since=time.time() - 700) + + mock_profile_registry = MagicMock() + mock_svc = MagicMock() + mock_svc.idle_stop_after_s = 600 + mock_profile = MagicMock() + mock_profile.services = {"vllm": mock_svc} + mock_profile_registry.list_public.return_value = [mock_profile] + + supervisor = AgentSupervisor( + lease_manager=lm, + service_registry=service_registry, + profile_registry=mock_profile_registry, + ) + supervisor.register("heimdall", "http://heimdall:7701") + + posted_urls = [] + + async def fake_http_post(url: str) -> bool: + posted_urls.append(url) + return True + + supervisor._http_post = fake_http_post + await supervisor._run_idle_sweep() + + assert len(posted_urls) == 1 + assert posted_urls[0] == "http://heimdall:7701/services/vllm/stop" + + +@pytest.mark.asyncio +async def test_run_idle_sweep_skips_without_registry(): + lm = LeaseManager() + supervisor = AgentSupervisor(lease_manager=lm) + # Should return immediately without error + await supervisor._run_idle_sweep() -- 2.45.2 From 49ab9e4e8875b5bdf8e1220bcb0afc3f2c5fe3e9 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:30:58 -0700 Subject: [PATCH 11/15] feat: wire ServiceRegistry into coordinator allocate endpoints --- .../resources/coordinator/app.py | 69 +++++++++++++++++- .../resources/coordinator/service_registry.py | 3 + .../test_coordinator_allocate.py | 71 +++++++++++++++++-- tests/test_resources/test_coordinator_app.py | 2 + tests/test_resources/test_integration.py | 2 + 5 files changed, 139 insertions(+), 8 deletions(-) diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index 6f4e66e..617cb87 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -1,6 +1,5 @@ from __future__ import annotations -import uuid as _uuid from contextlib import asynccontextmanager from pathlib import Path from typing import Any @@ -14,6 +13,7 @@ from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngi from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.node_selector import select_node from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() @@ -54,6 +54,7 @@ def create_coordinator_app( lease_manager: LeaseManager, profile_registry: ProfileRegistry, agent_supervisor: AgentSupervisor, + service_registry: ServiceRegistry, ) -> FastAPI: eviction_engine = EvictionEngine(lease_manager=lease_manager) @@ -307,8 +308,17 @@ def create_coordinator_app( ) if resp.is_success: data = resp.json() + alloc = service_registry.allocate( + service=service, + node_id=node_id, + gpu_id=gpu_id, + model=model, + caller=req.caller, + url=data.get("url", ""), + ttl_s=req.ttl_s, + ) return { - "allocation_id": str(_uuid.uuid4()), + "allocation_id": alloc.allocation_id, "service": service, "node_id": node_id, "gpu_id": gpu_id, @@ -326,6 +336,61 @@ def create_coordinator_app( detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}", ) + @app.delete("/api/services/{service}/allocations/{allocation_id}") + async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]: + released = service_registry.release(allocation_id) + if not released: + raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found") + return {"released": True, "allocation_id": allocation_id} + + @app.get("/api/services/{service}/status") + def get_service_status(service: str) -> dict[str, Any]: + instances = [i for i in service_registry.all_instances() if i.service == service] + allocations = [a for a in service_registry.all_allocations() if a.service == service] + return { + "service": service, + "instances": [ + { + "node_id": i.node_id, + "gpu_id": i.gpu_id, + "state": i.state, + "model": i.model, + "url": i.url, + "idle_since": i.idle_since, + } + for i in instances + ], + "allocations": [ + { + "allocation_id": a.allocation_id, + "node_id": a.node_id, + "gpu_id": a.gpu_id, + "model": a.model, + "caller": a.caller, + "url": a.url, + "expires_at": a.expires_at, + } + for a in allocations + ], + } + + @app.get("/api/services") + def list_services() -> dict[str, Any]: + instances = service_registry.all_instances() + return { + "services": [ + { + "service": i.service, + "node_id": i.node_id, + "gpu_id": i.gpu_id, + "state": i.state, + "model": i.model, + "url": i.url, + } + for i in instances + ] + } + @app.delete("/api/services/{service}") async def stop_service(service: str, node_id: str) -> dict[str, Any]: """Stop a managed service on the given node.""" diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py index 3d0df1b..ee4aa8c 100644 --- a/circuitforge_core/resources/coordinator/service_registry.py +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -121,6 +121,9 @@ class ServiceRegistry: self._instances[key] = inst return inst + def all_allocations(self) -> list[ServiceAllocation]: + return list(self._allocations.values()) + def all_instances(self) -> list[ServiceInstance]: return list(self._instances.values()) diff --git a/tests/test_resources/test_coordinator_allocate.py b/tests/test_resources/test_coordinator_allocate.py index bab3714..eb9e078 100644 --- a/tests/test_resources/test_coordinator_allocate.py +++ b/tests/test_resources/test_coordinator_allocate.py @@ -4,6 +4,7 @@ from fastapi.testclient import TestClient from circuitforge_core.resources.coordinator.app import create_coordinator_app from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord from circuitforge_core.resources.models import GpuInfo, NodeInfo @@ -28,12 +29,13 @@ def alloc_client(): lm = LeaseManager() pr = ProfileRegistry() sup = _make_supervisor_mock() - app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup) - return TestClient(app), sup + sr = ServiceRegistry() + app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup, service_registry=sr) + return TestClient(app), sup, sr def test_allocate_returns_allocation_id_and_url(alloc_client): - client, sup = alloc_client + client, sup, sr = alloc_client with patch("httpx.AsyncClient") as mock_http: mock_resp = MagicMock() mock_resp.is_success = True @@ -55,19 +57,76 @@ def test_allocate_returns_allocation_id_and_url(alloc_client): def test_allocate_returns_503_when_no_online_nodes(alloc_client): - client, sup = alloc_client + client, sup, sr = alloc_client sup.online_agents.return_value = {} resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]}) assert resp.status_code == 503 def test_allocate_returns_422_for_empty_candidates(alloc_client): - client, _ = alloc_client + client, _, sr = alloc_client resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []}) assert resp.status_code == 422 def test_allocate_returns_422_for_unknown_service(alloc_client): - client, _ = alloc_client + client, _, sr = alloc_client resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]}) assert resp.status_code == 422 + + +def test_allocate_records_in_registry(alloc_client): + client, sup, sr = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + allocation_id = resp.json()["allocation_id"] + + status_resp = client.get("/api/services/vllm/status") + assert status_resp.status_code == 200 + status_data = status_resp.json() + assert status_data["service"] == "vllm" + alloc_ids = [a["allocation_id"] for a in status_data["allocations"]] + assert allocation_id in alloc_ids + + +def test_release_allocation(alloc_client): + client, sup, sr = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + allocation_id = resp.json()["allocation_id"] + + del_resp = client.delete(f"/api/services/vllm/allocations/{allocation_id}") + assert del_resp.status_code == 200 + assert del_resp.json() == {"released": True, "allocation_id": allocation_id} + + status_resp = client.get("/api/services/vllm/status") + alloc_ids = [a["allocation_id"] for a in status_resp.json()["allocations"]] + assert allocation_id not in alloc_ids + + +def test_release_allocation_not_found(alloc_client): + client, _, sr = alloc_client + resp = client.delete("/api/services/vllm/allocations/bad-id") + assert resp.status_code == 404 diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py index 60d67c1..5981f8f 100644 --- a/tests/test_resources/test_coordinator_app.py +++ b/tests/test_resources/test_coordinator_app.py @@ -6,6 +6,7 @@ from circuitforge_core.resources.coordinator.app import create_coordinator_app from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo from circuitforge_core.resources.profiles.schema import load_profile @@ -35,6 +36,7 @@ def coordinator_client(): lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=supervisor, + service_registry=ServiceRegistry(), ) return TestClient(app), lease_manager diff --git a/tests/test_resources/test_integration.py b/tests/test_resources/test_integration.py index 814bb32..8fa94ad 100644 --- a/tests/test_resources/test_integration.py +++ b/tests/test_resources/test_integration.py @@ -11,6 +11,7 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo @@ -47,6 +48,7 @@ def system(): lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=mock_supervisor, + service_registry=ServiceRegistry(), ) client = TestClient(app) return client, lease_manager -- 2.45.2 From a4ccaaf3e26869bb20f490dbb8cbe03e3eb0d307 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:45:31 -0700 Subject: [PATCH 12/15] fix: address coordinator/idle-sweep quality issues from review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CRITICAL: idle sweep now calls mark_stopped() after successful HTTP stop, preventing repeated stop POSTs on every 3rd tick for the same instance - CRITICAL: active_allocations() now filters by gpu_id to avoid marking wrong instance idle on multi-GPU nodes when an allocation is released - CRITICAL: VRAM pre-flight guard in ensure_service was dead code — added the actual HTTPException(503) before the candidate loop - IMPORTANT: register() now updates agent_url on re-registration if it changed, so relocated agents are tracked correctly - IMPORTANT: updated test_service_registry.py callers of active_allocations() to pass the now-required gpu_id argument --- .../resources/coordinator/agent_supervisor.py | 10 +++++++++- circuitforge_core/resources/coordinator/app.py | 8 ++++++++ .../resources/coordinator/service_registry.py | 14 +++++++++++--- tests/test_resources/test_service_registry.py | 4 ++-- 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index 6db65ac..b389abb 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -45,6 +45,10 @@ class AgentSupervisor: if node_id not in self._agents: self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url) logger.info("Registered agent node: %s @ %s", node_id, agent_url) + else: + if self._agents[node_id].agent_url != agent_url: + self._agents[node_id].agent_url = agent_url + logger.info("Updated agent URL for %s → %s", node_id, agent_url) def get_node_info(self, node_id: str) -> NodeInfo | None: record = self._agents.get(node_id) @@ -156,7 +160,11 @@ class AgentSupervisor: "Idle sweep: stopping %s on %s gpu%s (idle timeout)", instance.service, instance.node_id, instance.gpu_id, ) - await self._http_post(stop_url) + success = await self._http_post(stop_url) + if success: + self._service_registry.mark_stopped( + instance.service, instance.node_id, instance.gpu_id + ) async def run_heartbeat_loop(self) -> None: self._running = True diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index 617cb87..76c104e 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -227,6 +227,14 @@ def create_coordinator_app( service_max_mb = svc.max_mb break + # Filter candidates by VRAM headroom — skip models where free VRAM + # is less than half of the service's max_mb ceiling. + if service_max_mb > 0 and free_mb < service_max_mb // 2: + raise HTTPException( + 503, + detail=f"Insufficient VRAM on gpu {req.gpu_id}: {free_mb}MB free, need at least {service_max_mb // 2}MB", + ) + last_error: str = "" async with httpx.AsyncClient(timeout=120.0) as client: for model in candidates: diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py index ee4aa8c..9a2f254 100644 --- a/circuitforge_core/resources/coordinator/service_registry.py +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -84,17 +84,17 @@ class ServiceRegistry: return False # If no active allocations remain for this instance, mark it idle key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}" - if self.active_allocations(alloc.service, alloc.node_id) == 0: + if self.active_allocations(alloc.service, alloc.node_id, alloc.gpu_id) == 0: if key in self._instances: self._instances[key] = dataclasses.replace( self._instances[key], state="idle", idle_since=time.time() ) return True - def active_allocations(self, service: str, node_id: str) -> int: + def active_allocations(self, service: str, node_id: str, gpu_id: int) -> int: return sum( 1 for a in self._allocations.values() - if a.service == service and a.node_id == node_id + if a.service == service and a.node_id == node_id and a.gpu_id == gpu_id ) # ── instance API ───────────────────────────────────────────────── @@ -127,6 +127,14 @@ class ServiceRegistry: def all_instances(self) -> list[ServiceInstance]: return list(self._instances.values()) + def mark_stopped(self, service: str, node_id: str, gpu_id: int) -> None: + """Transition an instance to 'stopped' state and clear idle_since.""" + key = f"{service}:{node_id}:{gpu_id}" + if key in self._instances: + self._instances[key] = dataclasses.replace( + self._instances[key], state="stopped", idle_since=None + ) + def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]: """ Return instances in 'idle' state whose idle time exceeds their configured timeout. diff --git a/tests/test_resources/test_service_registry.py b/tests/test_resources/test_service_registry.py index aefb34d..5c19913 100644 --- a/tests/test_resources/test_service_registry.py +++ b/tests/test_resources/test_service_registry.py @@ -25,13 +25,13 @@ def test_allocate_creates_allocation(registry): def test_active_allocations_count(registry): registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0) - assert registry.active_allocations("vllm", "heimdall") == 2 + assert registry.active_allocations("vllm", "heimdall", 0) == 2 def test_release_decrements_count(registry): alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) registry.release(alloc.allocation_id) - assert registry.active_allocations("vllm", "heimdall") == 0 + assert registry.active_allocations("vllm", "heimdall", 0) == 0 def test_release_nonexistent_returns_false(registry): -- 2.45.2 From 02806359af4f1ca08e9299dba54949c10001172e Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:47:27 -0700 Subject: [PATCH 13/15] feat: add Services table to coordinator dashboard --- .../resources/coordinator/dashboard.html | 185 +++++++++++++++--- 1 file changed, 159 insertions(+), 26 deletions(-) diff --git a/circuitforge_core/resources/coordinator/dashboard.html b/circuitforge_core/resources/coordinator/dashboard.html index 79fc9cb..a657111 100644 --- a/circuitforge_core/resources/coordinator/dashboard.html +++ b/circuitforge_core/resources/coordinator/dashboard.html @@ -52,8 +52,9 @@ .gpu-node { font-size: 0.75em; font-weight: 700; color: var(--indigo); margin-bottom: 1px; } .gpu-offline .gpu-node { color: var(--orange); } .gpu-name { font-size: 0.78em; color: var(--text); margin-bottom: 0.4rem; } - .vram-track { background: var(--bg); border-radius: var(--radius-sm); height: 6px; margin-bottom: 0.3rem; } - .vram-fill { height: 100%; border-radius: var(--radius-sm); transition: width 0.4s; } + .vram-track { position: relative; background: var(--bg); border-radius: var(--radius-sm); height: 6px; margin-bottom: 0.3rem; overflow: hidden; } + .vram-leased { position: absolute; left: 0; top: 0; height: 100%; background: var(--cyan); transition: width 0.4s; } + .vram-resident { position: absolute; top: 0; height: 100%; background: var(--amber); transition: left 0.4s, width 0.4s; } .vram-label { font-size: 0.72em; color: var(--muted); margin-bottom: 0.25rem; } .gpu-status { font-size: 0.72em; } .gpu-status.idle { color: var(--green); } @@ -62,21 +63,30 @@ .gpu-status.offline { color: var(--orange); } .spark-track { height: 24px; background: var(--bg); border-radius: var(--radius-sm); margin-top: 0.4rem; overflow: hidden; } - /* leases */ - #leases-table { width: 100%; border-collapse: collapse; background: var(--bg2); border: 1px solid var(--border); border-radius: var(--radius); overflow: hidden; margin-bottom: 1rem; } - #leases-table th { background: var(--bg3); color: var(--dim); font-size: 0.72em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; padding: 0.4rem 0.6rem; text-align: left; border-bottom: 1px solid var(--border); } - #leases-table td { padding: 0.35rem 0.6rem; border-bottom: 1px solid var(--border-dim); font-size: 0.8em; vertical-align: middle; } - #leases-table tr:last-child td { border-bottom: none; } + /* shared table base */ + .cf-table { width: 100%; border-collapse: collapse; background: var(--bg2); border: 1px solid var(--border); border-radius: var(--radius); overflow: hidden; margin-bottom: 1rem; } + .cf-table th { background: var(--bg3); color: var(--dim); font-size: 0.72em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; padding: 0.4rem 0.6rem; text-align: left; border-bottom: 1px solid var(--border); } + .cf-table td { padding: 0.35rem 0.6rem; border-bottom: 1px solid var(--border-dim); font-size: 0.8em; vertical-align: middle; } + .cf-table tr:last-child td { border-bottom: none; } .td-service { color: var(--indigo); font-weight: 600; } .td-node { color: var(--muted); } .td-mb { color: var(--text); } .td-priority { color: var(--amber); } + .td-model { color: var(--cyan); font-size: 0.75em; } + .td-warm { color: var(--amber); } .td-none { color: var(--dim); font-style: italic; } .ttl-wrap { display: flex; align-items: center; gap: 0.5rem; } .ttl-label { color: var(--cyan); font-variant-numeric: tabular-nums; white-space: nowrap; } .ttl-track { flex: 1; background: var(--bg); border-radius: var(--radius-sm); height: 4px; } .ttl-fill { height: 100%; border-radius: var(--radius-sm); background: var(--cyan); transition: width 0.4s; } + /* service state classes */ + .state-running { color: #2ecc40; } + .state-idle { color: #ff851b; } + .state-stopped { color: #aaa; } + .state-starting { color: #0074d9; } + .state-unknown { color: #ff4136; } + /* error */ #error-banner { display: none; background: rgba(248,81,73,.1); border: 1px solid var(--red); border-radius: var(--radius); color: var(--red); padding: 0.5rem 0.75rem; font-size: 0.82em; margin-bottom: 1rem; } @@ -102,8 +112,20 @@
+
+ + + + + + + + +
ServiceNodeGPUStateModelURL
+
+ - +
@@ -112,10 +134,22 @@
ServiceNode / GPUVRAMPriorityTTL / Expires
+ + + + + + + + +
ServiceNodeModelWarm Since
+ @@ -198,6 +232,41 @@ setInterval(() => { document.getElementById('countdown').textContent = countdown; }, 1000); +// ── state class helper ─────────────────────────────────────────── +function stateClass(state) { + const map = { running: 'state-running', idle: 'state-idle', stopped: 'state-stopped', starting: 'state-starting' }; + return map[state] || 'state-unknown'; +} + +// ── render: services table ─────────────────────────────────────── +function renderServices(services) { + const tbody = document.getElementById('services-body'); + if (!services || services.length === 0) { + const tr = document.createElement('tr'); + const td = el('td', { cls: 'td-none', text: 'No service instances registered.' }); + td.setAttribute('colspan', '6'); + tr.appendChild(td); + setChildren(tbody, tr); + return; + } + + const rows = services.map(svc => { + const tr = document.createElement('tr'); + const fields = [ + { text: svc.service, cls: 'td-service' }, + { text: svc.node_id, cls: 'td-node' }, + { text: String(svc.gpu_id), cls: 'td-mb' }, + { text: svc.state, cls: stateClass(svc.state) }, + { text: svc.model || '\u2014', cls: 'td-model' }, + { text: svc.url || '\u2014', cls: 'td-node' }, + ]; + fields.forEach(f => tr.appendChild(el('td', { cls: f.cls, text: f.text }))); + return tr; + }); + + setChildren(tbody, ...rows); +} + // ── render: health strip ───────────────────────────────────────── function renderHealth(ok) { const strip = document.getElementById('health-strip'); @@ -206,7 +275,8 @@ function renderHealth(ok) { } // ── render: GPU grid ───────────────────────────────────────────── -function renderNodes(nodes) { +// leasedByGpu: "nodeId:gpuId" → total MB currently leased (from active leases) +function renderNodes(nodes, leasedByGpu) { const grid = document.getElementById('gpu-grid'); if (!nodes || nodes.length === 0) { setChildren(grid, el('div', { text: 'No nodes registered.', style: { color: 'var(--dim)', fontSize: '0.8em', padding: '0.5rem' } })); @@ -216,33 +286,46 @@ function renderNodes(nodes) { const cards = []; for (const node of nodes) { for (const gpu of node.gpus) { - const key = node.node_id + ':' + gpu.gpu_id; - const pct = gpu.vram_total_mb > 0 ? gpu.vram_used_mb / gpu.vram_total_mb : 0; - const usedGb = (gpu.vram_used_mb / 1024).toFixed(1); - const totalGb = (gpu.vram_total_mb / 1024).toFixed(1); - const color = vramColor(pct); + const key = node.node_id + ':' + gpu.gpu_id; + const total = gpu.vram_total_mb || 1; + const used = gpu.vram_used_mb; + const leased = leasedByGpu[key] || 0; + // Resident = nvidia-smi used minus actively leased; clamped to [0, used]. + const resident = Math.max(0, Math.min(used - leased, used)); + const pct = used / total; if (!sparkHistory[key]) sparkHistory[key] = []; - sparkHistory[key].push(gpu.vram_used_mb); + sparkHistory[key].push(used); if (sparkHistory[key].length > 20) sparkHistory[key].shift(); const statusCls = pct >= 0.9 ? 'full' : pct >= 0.1 ? 'busy' : 'idle'; const statusText = pct >= 0.9 ? 'saturated' : pct >= 0.1 ? Math.round(pct * 100) + '% used' : 'idle'; - const card = el('div', { cls: 'gpu-card' }); - + const card = el('div', { cls: 'gpu-card' }); const nodeLabel = el('div', { cls: 'gpu-node', text: node.node_id.toUpperCase() + ' · GPU ' + gpu.gpu_id }); const nameLine = el('div', { cls: 'gpu-name', text: gpu.name || 'Unknown GPU' }); - const track = el('div', { cls: 'vram-track' }); - const fill = el('div', { cls: 'vram-fill', style: { width: (pct * 100).toFixed(1) + '%', background: color } }); - track.appendChild(fill); + // Stacked bar: cyan (leased) → amber (resident) → dark bg (free). + const leasedPct = (leased / total * 100).toFixed(1); + const residentPct = (resident / total * 100).toFixed(1); + const track = el('div', { cls: 'vram-track' }); + const fillLeased = el('div', { cls: 'vram-leased', style: { width: leasedPct + '%' } }); + const fillResident = el('div', { cls: 'vram-resident', style: { left: leasedPct + '%', width: residentPct + '%' } }); + append(track, fillLeased, fillResident); - const vramLbl = el('div', { cls: 'vram-label', text: usedGb + ' / ' + totalGb + ' GB' }); - const statusEl = el('div', { cls: 'gpu-status ' + statusCls, text: statusText }); + // Breakdown label when something is allocated. + let labelText = (used / 1024).toFixed(1) + ' / ' + (total / 1024).toFixed(1) + ' GB'; + if (leased > 0 || resident > 0) { + const parts = []; + if (leased > 0) parts.push((leased / 1024).toFixed(1) + 'G leased'); + if (resident > 0) parts.push((resident / 1024).toFixed(1) + 'G resident'); + labelText += ' (' + parts.join(' · ') + ')'; + } + const vramLbl = el('div', { cls: 'vram-label', text: labelText }); + const statusEl = el('div', { cls: 'gpu-status ' + statusCls, text: statusText }); const sparkTrack = el('div', { cls: 'spark-track' }); - sparkTrack.appendChild(buildSparkline(sparkHistory[key], gpu.vram_total_mb)); + sparkTrack.appendChild(buildSparkline(sparkHistory[key], total)); append(card, nodeLabel, nameLine, track, vramLbl, statusEl, sparkTrack); cards.push(card); @@ -252,6 +335,40 @@ function renderNodes(nodes) { setChildren(grid, ...cards); } +// ── render: warm models table ──────────────────────────────────── +function renderResidents(residents) { + const tbody = document.getElementById('resident-body'); + if (!residents || residents.length === 0) { + const tr = document.createElement('tr'); + const td = el('td', { cls: 'td-none', text: 'No warm models detected.' }); + td.setAttribute('colspan', '4'); + tr.appendChild(td); + setChildren(tbody, tr); + return; + } + + const now = Date.now() / 1000; + const rows = residents.map(r => { + const warmSecs = now - (r.first_seen || now); + const warmText = warmSecs < 60 + ? Math.floor(warmSecs) + 's' + : warmSecs < 3600 + ? Math.floor(warmSecs / 60) + 'm ' + String(Math.floor(warmSecs % 60)).padStart(2, '0') + 's' + : Math.floor(warmSecs / 3600) + 'h ' + String(Math.floor((warmSecs % 3600) / 60)).padStart(2, '0') + 'm'; + + const tr = document.createElement('tr'); + append(tr, + el('td', { cls: 'td-service', text: r.service }), + el('td', { cls: 'td-node', text: r.node_id }), + el('td', { cls: 'td-model', text: r.model_name || '—' }), + el('td', { cls: 'td-warm', text: warmText }), + ); + return tr; + }); + + setChildren(tbody, ...rows); +} + // ── render: leases table ───────────────────────────────────────── function renderLeases(leases) { const tbody = document.getElementById('leases-body'); @@ -316,17 +433,33 @@ function clearError() { document.getElementById('error-banner').style.display = // ── poll ───────────────────────────────────────────────────────── async function poll() { try { - const [nodesRes, leasesRes, healthRes] = await Promise.all([ + const [nodesRes, leasesRes, residentRes, healthRes, servicesRes] = await Promise.all([ fetch('/api/nodes'), fetch('/api/leases'), + fetch('/api/resident'), fetch('/api/health'), + fetch('/api/services'), ]); if (!nodesRes.ok || !leasesRes.ok) throw new Error('API error: ' + nodesRes.status); - const [nodesData, leasesData] = await Promise.all([nodesRes.json(), leasesRes.json()]); + const [nodesData, leasesData, residentData, servicesData] = await Promise.all([ + nodesRes.json(), leasesRes.json(), + residentRes.ok ? residentRes.json() : Promise.resolve({ residents: [] }), + servicesRes.ok ? servicesRes.json() : Promise.resolve({ services: [] }), + ]); + + // Build per-GPU leased-MB index for the stacked bar. + const leasedByGpu = {}; + for (const lease of (leasesData.leases || [])) { + const key = lease.node_id + ':' + lease.gpu_id; + leasedByGpu[key] = (leasedByGpu[key] || 0) + lease.mb_granted; + } + clearError(); renderHealth(healthRes.ok); - renderNodes(nodesData.nodes || []); + renderNodes(nodesData.nodes || [], leasedByGpu); + renderServices(servicesData.services || []); renderLeases(leasesData.leases || []); + renderResidents(residentData.residents || []); } catch (err) { showError('Failed to reach coordinator: ' + err.message); renderHealth(false); -- 2.45.2 From 1a20b80a507c05c1c5fda66d69430612824b8af9 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:49:50 -0700 Subject: [PATCH 14/15] test: add VRAM pre-flight 503 test for ensure_service --- tests/test_resources/test_coordinator_app.py | 35 ++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py index 5981f8f..49eacbf 100644 --- a/tests/test_resources/test_coordinator_app.py +++ b/tests/test_resources/test_coordinator_app.py @@ -146,3 +146,38 @@ def test_single_gpu_8gb_profile_has_idle_stop_after_s(): assert vllm_svc is not None assert hasattr(vllm_svc, "idle_stop_after_s") assert vllm_svc.idle_stop_after_s == 600 + + +def test_ensure_service_returns_503_when_vram_too_low(): + """VRAM pre-flight guard fires before any HTTP request when free VRAM < max_mb // 2.""" + # vllm max_mb = 5120 → threshold = 2560 MB; 100 MB free triggers 503. + lease_manager = LeaseManager() + lease_manager.register_gpu("low-vram-node", 0, 512) + profile_registry = ProfileRegistry() + supervisor = MagicMock() + supervisor.get_node_info.return_value = NodeInfo( + node_id="low-vram-node", + agent_url="http://localhost:7701", + gpus=[GpuInfo(gpu_id=0, name="GTX 1050", + vram_total_mb=512, vram_used_mb=412, vram_free_mb=100)], + last_heartbeat=0.0, + ) + supervisor.all_nodes.return_value = [] + app = create_coordinator_app( + lease_manager=lease_manager, + profile_registry=profile_registry, + agent_supervisor=supervisor, + service_registry=ServiceRegistry(), + ) + client = TestClient(app) + + resp = client.post("/api/services/vllm/ensure", json={ + "node_id": "low-vram-node", + "gpu_id": 0, + "params": {"model": "some-model"}, + }) + + assert resp.status_code == 503 + assert "Insufficient VRAM" in resp.json()["detail"] + # Guard must fire before any agent HTTP call is attempted. + supervisor.get_node_info.assert_called_once_with("low-vram-node") -- 2.45.2 From e58c3aea23652031c6f7bdee1df3768932a924f6 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:55:38 -0700 Subject: [PATCH 15/15] fix: TTL sweep, immutability, service-scoped release, logger in orch alloc - ServiceRegistry: add sweep_expired_allocations() to remove stale TTL allocations and transition instances to idle; add get_allocation() helper - AgentSupervisor._run_idle_sweep: call sweep_expired_allocations() before idle-timeout check so crashed-caller leaks are cleaned up each sweep tick - schema._parse_managed: copy raw dict before extracting 'type' key instead of mutating caller's dict with pop() - app.release_allocation: validate allocation belongs to the given service path param before releasing; return 404 if mismatch - router._try_cf_orch_alloc: replace print() with logger.warning(); add module-level logger = logging.getLogger(__name__) - tests: add test_sweep_expired_allocations covering TTL expiry and idle state transition --- circuitforge_core/llm/router.py | 5 +++- .../resources/coordinator/agent_supervisor.py | 3 +++ .../resources/coordinator/app.py | 3 +++ .../resources/coordinator/service_registry.py | 19 +++++++++++++++ .../resources/profiles/schema.py | 7 +++--- tests/test_resources/test_service_registry.py | 23 +++++++++++++++++++ 6 files changed, 56 insertions(+), 4 deletions(-) diff --git a/circuitforge_core/llm/router.py b/circuitforge_core/llm/router.py index 7336209..3a22d81 100644 --- a/circuitforge_core/llm/router.py +++ b/circuitforge_core/llm/router.py @@ -3,12 +3,15 @@ LLM abstraction layer with priority fallback chain. Reads config from ~/.config/circuitforge/llm.yaml. Tries backends in order; falls back on any error. """ +import logging import os import yaml import requests from pathlib import Path from openai import OpenAI +logger = logging.getLogger(__name__) + CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml" @@ -62,7 +65,7 @@ class LLMRouter: alloc = ctx.__enter__() return (ctx, alloc) except Exception as exc: - print(f"[LLMRouter] cf_orch allocation failed, using base_url directly: {exc}") + logger.warning("[LLMRouter] cf_orch allocation failed, using base_url directly: %s", exc) return None def complete(self, prompt: str, system: str | None = None, diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index b389abb..8536636 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -147,6 +147,9 @@ class AgentSupervisor: async def _run_idle_sweep(self) -> None: if self._service_registry is None: return + expired = self._service_registry.sweep_expired_allocations() + if expired: + logger.info("TTL sweep: expired %d allocation(s): %s", len(expired), expired) idle_stop_config = self._build_idle_stop_config() if not idle_stop_config: return diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index 76c104e..1fd7c91 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -346,6 +346,9 @@ def create_coordinator_app( @app.delete("/api/services/{service}/allocations/{allocation_id}") async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]: + existing = service_registry.get_allocation(allocation_id) + if existing is None or existing.service != service: + raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found for service {service!r}") released = service_registry.release(allocation_id) if not released: raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found") diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py index 9a2f254..c258ad7 100644 --- a/circuitforge_core/resources/coordinator/service_registry.py +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -121,6 +121,25 @@ class ServiceRegistry: self._instances[key] = inst return inst + def get_allocation(self, allocation_id: str) -> ServiceAllocation | None: + return self._allocations.get(allocation_id) + + def sweep_expired_allocations(self) -> list[str]: + """ + Remove all allocations whose TTL has elapsed and transition the + corresponding instance to 'idle' if no active allocations remain. + Returns the list of expired allocation_ids. + """ + now = time.time() + expired = [ + alloc_id + for alloc_id, alloc in self._allocations.items() + if alloc.expires_at > 0 and now > alloc.expires_at + ] + for alloc_id in expired: + self.release(alloc_id) + return expired + def all_allocations(self) -> list[ServiceAllocation]: return list(self._allocations.values()) diff --git a/circuitforge_core/resources/profiles/schema.py b/circuitforge_core/resources/profiles/schema.py index 4439039..1ba8ee5 100644 --- a/circuitforge_core/resources/profiles/schema.py +++ b/circuitforge_core/resources/profiles/schema.py @@ -61,11 +61,12 @@ class ServiceProfile(BaseModel): return values if not isinstance(raw, dict): return values - spec_type = raw.pop("type", None) + spec_type = raw.get("type") + managed_fields = {k: v for k, v in raw.items() if k != "type"} if spec_type == "docker": - values["managed"] = DockerSpec(**raw) + values["managed"] = DockerSpec(**managed_fields) elif spec_type == "process": - values["managed"] = ProcessSpec(**raw) + values["managed"] = ProcessSpec(**managed_fields) else: raise ValueError(f"Unknown managed service type: {spec_type!r}") return values diff --git a/tests/test_resources/test_service_registry.py b/tests/test_resources/test_service_registry.py index 5c19913..dc73a9c 100644 --- a/tests/test_resources/test_service_registry.py +++ b/tests/test_resources/test_service_registry.py @@ -61,3 +61,26 @@ def test_new_alloc_on_idle_instance_marks_it_running(registry): model="M", url="http://h:8000") registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0) assert registry.all_instances()[0].state == "running" + + +def test_sweep_expired_allocations(registry): + # Register a running instance so idle-transition logic has something to act on. + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="M", url="http://h:8000") + # Create an allocation with a very short TTL (1 second). + alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "caller", ttl_s=1) + assert registry.active_allocations("vllm", "heimdall", 0) == 1 + + # Wait for TTL to elapse. + time.sleep(1.1) + + expired = registry.sweep_expired_allocations() + + # The allocation should have been swept. + assert alloc.allocation_id in expired + assert registry.active_allocations("vllm", "heimdall", 0) == 0 + + # The instance should have transitioned to idle since no allocations remain. + instance = registry.all_instances()[0] + assert instance.state == "idle" + assert instance.idle_since is not None -- 2.45.2