From cede761d826275a946a98c44334b4e15e87e45e0 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 30 Mar 2026 21:44:42 -0700 Subject: [PATCH] feat(resources): add AgentSupervisor and EvictionEngine --- .../resources/coordinator/agent_supervisor.py | 101 ++++++++++++++++++ .../resources/coordinator/eviction_engine.py | 82 ++++++++++++++ tests/test_resources/test_eviction_engine.py | 67 ++++++++++++ 3 files changed, 250 insertions(+) create mode 100644 circuitforge_core/resources/coordinator/agent_supervisor.py create mode 100644 circuitforge_core/resources/coordinator/eviction_engine.py create mode 100644 tests/test_resources/test_eviction_engine.py diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py new file mode 100644 index 0000000..63d92e7 --- /dev/null +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import asyncio +import logging +import time +from dataclasses import dataclass, field + +import httpx + +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.models import GpuInfo, NodeInfo + +logger = logging.getLogger(__name__) + +_HEARTBEAT_INTERVAL_S = 10.0 +_AGENT_TIMEOUT_S = 5.0 + + +@dataclass +class AgentRecord: + node_id: str + agent_url: str + last_seen: float = field(default_factory=time.time) + gpus: list[GpuInfo] = field(default_factory=list) + online: bool = False + + +class AgentSupervisor: + def __init__(self, lease_manager: LeaseManager) -> None: + self._agents: dict[str, AgentRecord] = {} + self._lease_manager = lease_manager + self._running = False + + def register(self, node_id: str, agent_url: str) -> None: + if node_id not in self._agents: + self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url) + logger.info("Registered agent node: %s @ %s", node_id, agent_url) + + def get_node_info(self, node_id: str) -> NodeInfo | None: + record = self._agents.get(node_id) + if record is None: + return None + return NodeInfo( + node_id=record.node_id, + agent_url=record.agent_url, + gpus=record.gpus, + last_heartbeat=record.last_seen, + ) + + def all_nodes(self) -> list[NodeInfo]: + return [ + NodeInfo( + node_id=r.node_id, + agent_url=r.agent_url, + gpus=r.gpus, + last_heartbeat=r.last_seen, + ) + for r in self._agents.values() + ] + + async def poll_agent(self, node_id: str) -> bool: + record = self._agents.get(node_id) + if record is None: + return False + try: + async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client: + resp = await client.get(f"{record.agent_url}/gpu-info") + resp.raise_for_status() + data = resp.json() + gpus = [ + GpuInfo( + gpu_id=g["gpu_id"], + name=g["name"], + vram_total_mb=g["vram_total_mb"], + vram_used_mb=g["vram_used_mb"], + vram_free_mb=g["vram_free_mb"], + ) + for g in data.get("gpus", []) + ] + record.gpus = gpus + record.last_seen = time.time() + record.online = True + for gpu in gpus: + self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb) + return True + except Exception as exc: + logger.warning("Agent %s unreachable: %s", node_id, exc) + record.online = False + return False + + async def poll_all(self) -> None: + await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents]) + + async def run_heartbeat_loop(self) -> None: + self._running = True + while self._running: + await self.poll_all() + await asyncio.sleep(_HEARTBEAT_INTERVAL_S) + + def stop(self) -> None: + self._running = False diff --git a/circuitforge_core/resources/coordinator/eviction_engine.py b/circuitforge_core/resources/coordinator/eviction_engine.py new file mode 100644 index 0000000..880c0e9 --- /dev/null +++ b/circuitforge_core/resources/coordinator/eviction_engine.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import asyncio +import logging + +import httpx + +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.models import VRAMLease + +logger = logging.getLogger(__name__) + +_DEFAULT_EVICTION_TIMEOUT_S = 10.0 + + +class EvictionEngine: + def __init__( + self, + lease_manager: LeaseManager, + eviction_timeout_s: float = _DEFAULT_EVICTION_TIMEOUT_S, + ) -> None: + self.lease_manager = lease_manager + self._timeout = eviction_timeout_s + + async def request_lease( + self, + node_id: str, + gpu_id: int, + mb: int, + service: str, + priority: int, + agent_url: str, + ttl_s: float = 0.0, + ) -> VRAMLease | None: + # Fast path: enough free VRAM + lease = await self.lease_manager.try_grant( + node_id, gpu_id, mb, service, priority, ttl_s + ) + if lease is not None: + return lease + + # Find eviction candidates + candidates = self.lease_manager.get_eviction_candidates( + node_id=node_id, gpu_id=gpu_id, + needed_mb=mb, requester_priority=priority, + ) + if not candidates: + logger.info( + "No eviction candidates for %s on %s:GPU%d (%dMB needed)", + service, node_id, gpu_id, mb, + ) + return None + + # Evict candidates + freed_mb = sum(c.mb_granted for c in candidates) + logger.info( + "Evicting %d lease(s) to free %dMB for %s", + len(candidates), freed_mb, service, + ) + for candidate in candidates: + await self._evict_lease(candidate, agent_url) + + # Wait for evictions to free up VRAM (poll with timeout) + deadline = asyncio.get_event_loop().time() + self._timeout + while asyncio.get_event_loop().time() < deadline: + lease = await self.lease_manager.try_grant( + node_id, gpu_id, mb, service, priority, ttl_s + ) + if lease is not None: + return lease + await asyncio.sleep(0.1) + + logger.warning("Eviction timed out for %s after %.1fs", service, self._timeout) + return None + + async def _evict_lease(self, lease: VRAMLease, agent_url: str) -> None: + """Release lease accounting. Process-level eviction deferred to Plan B.""" + await self.lease_manager.release(lease.lease_id) + + async def _call_agent_evict(self, agent_url: str, lease: VRAMLease) -> bool: + """POST /evict to the agent. Stub for v1 — real process lookup in Plan B.""" + return True diff --git a/tests/test_resources/test_eviction_engine.py b/tests/test_resources/test_eviction_engine.py new file mode 100644 index 0000000..d7051e3 --- /dev/null +++ b/tests/test_resources/test_eviction_engine.py @@ -0,0 +1,67 @@ +import asyncio +import pytest +from unittest.mock import AsyncMock, patch +from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager + + +@pytest.fixture +def lease_manager(): + mgr = LeaseManager() + mgr.register_gpu("heimdall", 0, 8192) + return mgr + + +@pytest.fixture +def engine(lease_manager): + return EvictionEngine(lease_manager=lease_manager, eviction_timeout_s=0.1) + + +@pytest.mark.asyncio +async def test_request_lease_grants_when_vram_available(engine, lease_manager): + lease = await engine.request_lease( + node_id="heimdall", gpu_id=0, mb=4096, + service="peregrine", priority=1, + agent_url="http://localhost:7701", + ) + assert lease is not None + assert lease.mb_granted == 4096 + + +@pytest.mark.asyncio +async def test_request_lease_evicts_and_grants(engine, lease_manager): + # Pre-fill with a low-priority lease + big_lease = await lease_manager.try_grant( + "heimdall", 0, 7000, "comfyui", priority=4 + ) + assert big_lease is not None + + # Mock the agent eviction call + with patch( + "circuitforge_core.resources.coordinator.eviction_engine.EvictionEngine._call_agent_evict", + new_callable=AsyncMock, + ) as mock_evict: + mock_evict.return_value = True + # Simulate the comfyui lease being released (as if the agent evicted it) + asyncio.get_event_loop().call_later( + 0.05, lambda: asyncio.ensure_future(lease_manager.release(big_lease.lease_id)) + ) + lease = await engine.request_lease( + node_id="heimdall", gpu_id=0, mb=4096, + service="peregrine", priority=1, + agent_url="http://localhost:7701", + ) + assert lease is not None + assert lease.holder_service == "peregrine" + + +@pytest.mark.asyncio +async def test_request_lease_returns_none_when_no_eviction_candidates(engine): + await engine.lease_manager.try_grant("heimdall", 0, 6000, "vllm", priority=1) + # Requesting 4GB but no lower-priority leases exist + lease = await engine.request_lease( + node_id="heimdall", gpu_id=0, mb=4096, + service="kiwi", priority=2, + agent_url="http://localhost:7701", + ) + assert lease is None