From a4ccaaf3e26869bb20f490dbb8cbe03e3eb0d307 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:45:31 -0700 Subject: [PATCH] fix: address coordinator/idle-sweep quality issues from review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CRITICAL: idle sweep now calls mark_stopped() after successful HTTP stop, preventing repeated stop POSTs on every 3rd tick for the same instance - CRITICAL: active_allocations() now filters by gpu_id to avoid marking wrong instance idle on multi-GPU nodes when an allocation is released - CRITICAL: VRAM pre-flight guard in ensure_service was dead code — added the actual HTTPException(503) before the candidate loop - IMPORTANT: register() now updates agent_url on re-registration if it changed, so relocated agents are tracked correctly - IMPORTANT: updated test_service_registry.py callers of active_allocations() to pass the now-required gpu_id argument --- .../resources/coordinator/agent_supervisor.py | 10 +++++++++- circuitforge_core/resources/coordinator/app.py | 8 ++++++++ .../resources/coordinator/service_registry.py | 14 +++++++++++--- tests/test_resources/test_service_registry.py | 4 ++-- 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index 6db65ac..b389abb 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -45,6 +45,10 @@ class AgentSupervisor: if node_id not in self._agents: self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url) logger.info("Registered agent node: %s @ %s", node_id, agent_url) + else: + if self._agents[node_id].agent_url != agent_url: + self._agents[node_id].agent_url = agent_url + logger.info("Updated agent URL for %s → %s", node_id, agent_url) def get_node_info(self, node_id: str) -> NodeInfo | None: record = self._agents.get(node_id) @@ -156,7 +160,11 @@ class AgentSupervisor: "Idle sweep: stopping %s on %s gpu%s (idle timeout)", instance.service, instance.node_id, instance.gpu_id, ) - await self._http_post(stop_url) + success = await self._http_post(stop_url) + if success: + self._service_registry.mark_stopped( + instance.service, instance.node_id, instance.gpu_id + ) async def run_heartbeat_loop(self) -> None: self._running = True diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index 617cb87..76c104e 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -227,6 +227,14 @@ def create_coordinator_app( service_max_mb = svc.max_mb break + # Filter candidates by VRAM headroom — skip models where free VRAM + # is less than half of the service's max_mb ceiling. + if service_max_mb > 0 and free_mb < service_max_mb // 2: + raise HTTPException( + 503, + detail=f"Insufficient VRAM on gpu {req.gpu_id}: {free_mb}MB free, need at least {service_max_mb // 2}MB", + ) + last_error: str = "" async with httpx.AsyncClient(timeout=120.0) as client: for model in candidates: diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py index ee4aa8c..9a2f254 100644 --- a/circuitforge_core/resources/coordinator/service_registry.py +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -84,17 +84,17 @@ class ServiceRegistry: return False # If no active allocations remain for this instance, mark it idle key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}" - if self.active_allocations(alloc.service, alloc.node_id) == 0: + if self.active_allocations(alloc.service, alloc.node_id, alloc.gpu_id) == 0: if key in self._instances: self._instances[key] = dataclasses.replace( self._instances[key], state="idle", idle_since=time.time() ) return True - def active_allocations(self, service: str, node_id: str) -> int: + def active_allocations(self, service: str, node_id: str, gpu_id: int) -> int: return sum( 1 for a in self._allocations.values() - if a.service == service and a.node_id == node_id + if a.service == service and a.node_id == node_id and a.gpu_id == gpu_id ) # ── instance API ───────────────────────────────────────────────── @@ -127,6 +127,14 @@ class ServiceRegistry: def all_instances(self) -> list[ServiceInstance]: return list(self._instances.values()) + def mark_stopped(self, service: str, node_id: str, gpu_id: int) -> None: + """Transition an instance to 'stopped' state and clear idle_since.""" + key = f"{service}:{node_id}:{gpu_id}" + if key in self._instances: + self._instances[key] = dataclasses.replace( + self._instances[key], state="stopped", idle_since=None + ) + def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]: """ Return instances in 'idle' state whose idle time exceeds their configured timeout. diff --git a/tests/test_resources/test_service_registry.py b/tests/test_resources/test_service_registry.py index aefb34d..5c19913 100644 --- a/tests/test_resources/test_service_registry.py +++ b/tests/test_resources/test_service_registry.py @@ -25,13 +25,13 @@ def test_allocate_creates_allocation(registry): def test_active_allocations_count(registry): registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0) - assert registry.active_allocations("vllm", "heimdall") == 2 + assert registry.active_allocations("vllm", "heimdall", 0) == 2 def test_release_decrements_count(registry): alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) registry.release(alloc.allocation_id) - assert registry.active_allocations("vllm", "heimdall") == 0 + assert registry.active_allocations("vllm", "heimdall", 0) == 0 def test_release_nonexistent_returns_false(registry):