fix: address coordinator/idle-sweep quality issues from review

- CRITICAL: idle sweep now calls mark_stopped() after successful HTTP stop,
  preventing repeated stop POSTs on every 3rd tick for the same instance
- CRITICAL: active_allocations() now filters by gpu_id to avoid marking wrong
  instance idle on multi-GPU nodes when an allocation is released
- CRITICAL: VRAM pre-flight guard in ensure_service was dead code — added the
  actual HTTPException(503) before the candidate loop
- IMPORTANT: register() now updates agent_url on re-registration if it changed,
  so relocated agents are tracked correctly
- IMPORTANT: updated test_service_registry.py callers of active_allocations()
  to pass the now-required gpu_id argument
This commit is contained in:
pyr0ball 2026-04-02 12:45:31 -07:00
parent 49ab9e4e88
commit a4ccaaf3e2
4 changed files with 30 additions and 6 deletions

View file

@ -45,6 +45,10 @@ class AgentSupervisor:
if node_id not in self._agents:
self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url)
logger.info("Registered agent node: %s @ %s", node_id, agent_url)
else:
if self._agents[node_id].agent_url != agent_url:
self._agents[node_id].agent_url = agent_url
logger.info("Updated agent URL for %s%s", node_id, agent_url)
def get_node_info(self, node_id: str) -> NodeInfo | None:
record = self._agents.get(node_id)
@ -156,7 +160,11 @@ class AgentSupervisor:
"Idle sweep: stopping %s on %s gpu%s (idle timeout)",
instance.service, instance.node_id, instance.gpu_id,
)
await self._http_post(stop_url)
success = await self._http_post(stop_url)
if success:
self._service_registry.mark_stopped(
instance.service, instance.node_id, instance.gpu_id
)
async def run_heartbeat_loop(self) -> None:
self._running = True

View file

@ -227,6 +227,14 @@ def create_coordinator_app(
service_max_mb = svc.max_mb
break
# Filter candidates by VRAM headroom — skip models where free VRAM
# is less than half of the service's max_mb ceiling.
if service_max_mb > 0 and free_mb < service_max_mb // 2:
raise HTTPException(
503,
detail=f"Insufficient VRAM on gpu {req.gpu_id}: {free_mb}MB free, need at least {service_max_mb // 2}MB",
)
last_error: str = ""
async with httpx.AsyncClient(timeout=120.0) as client:
for model in candidates:

View file

@ -84,17 +84,17 @@ class ServiceRegistry:
return False
# If no active allocations remain for this instance, mark it idle
key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}"
if self.active_allocations(alloc.service, alloc.node_id) == 0:
if self.active_allocations(alloc.service, alloc.node_id, alloc.gpu_id) == 0:
if key in self._instances:
self._instances[key] = dataclasses.replace(
self._instances[key], state="idle", idle_since=time.time()
)
return True
def active_allocations(self, service: str, node_id: str) -> int:
def active_allocations(self, service: str, node_id: str, gpu_id: int) -> int:
return sum(
1 for a in self._allocations.values()
if a.service == service and a.node_id == node_id
if a.service == service and a.node_id == node_id and a.gpu_id == gpu_id
)
# ── instance API ─────────────────────────────────────────────────
@ -127,6 +127,14 @@ class ServiceRegistry:
def all_instances(self) -> list[ServiceInstance]:
return list(self._instances.values())
def mark_stopped(self, service: str, node_id: str, gpu_id: int) -> None:
"""Transition an instance to 'stopped' state and clear idle_since."""
key = f"{service}:{node_id}:{gpu_id}"
if key in self._instances:
self._instances[key] = dataclasses.replace(
self._instances[key], state="stopped", idle_since=None
)
def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]:
"""
Return instances in 'idle' state whose idle time exceeds their configured timeout.

View file

@ -25,13 +25,13 @@ def test_allocate_creates_allocation(registry):
def test_active_allocations_count(registry):
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0)
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0)
assert registry.active_allocations("vllm", "heimdall") == 2
assert registry.active_allocations("vllm", "heimdall", 0) == 2
def test_release_decrements_count(registry):
alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0)
registry.release(alloc.allocation_id)
assert registry.active_allocations("vllm", "heimdall") == 0
assert registry.active_allocations("vllm", "heimdall", 0) == 0
def test_release_nonexistent_returns_false(registry):