feat(resources): add AgentSupervisor and EvictionEngine
This commit is contained in:
parent
7718911652
commit
cede761d82
3 changed files with 250 additions and 0 deletions
101
circuitforge_core/resources/coordinator/agent_supervisor.py
Normal file
101
circuitforge_core/resources/coordinator/agent_supervisor.py
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import httpx
|
||||
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.models import GpuInfo, NodeInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_HEARTBEAT_INTERVAL_S = 10.0
|
||||
_AGENT_TIMEOUT_S = 5.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentRecord:
|
||||
node_id: str
|
||||
agent_url: str
|
||||
last_seen: float = field(default_factory=time.time)
|
||||
gpus: list[GpuInfo] = field(default_factory=list)
|
||||
online: bool = False
|
||||
|
||||
|
||||
class AgentSupervisor:
|
||||
def __init__(self, lease_manager: LeaseManager) -> None:
|
||||
self._agents: dict[str, AgentRecord] = {}
|
||||
self._lease_manager = lease_manager
|
||||
self._running = False
|
||||
|
||||
def register(self, node_id: str, agent_url: str) -> None:
|
||||
if node_id not in self._agents:
|
||||
self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url)
|
||||
logger.info("Registered agent node: %s @ %s", node_id, agent_url)
|
||||
|
||||
def get_node_info(self, node_id: str) -> NodeInfo | None:
|
||||
record = self._agents.get(node_id)
|
||||
if record is None:
|
||||
return None
|
||||
return NodeInfo(
|
||||
node_id=record.node_id,
|
||||
agent_url=record.agent_url,
|
||||
gpus=record.gpus,
|
||||
last_heartbeat=record.last_seen,
|
||||
)
|
||||
|
||||
def all_nodes(self) -> list[NodeInfo]:
|
||||
return [
|
||||
NodeInfo(
|
||||
node_id=r.node_id,
|
||||
agent_url=r.agent_url,
|
||||
gpus=r.gpus,
|
||||
last_heartbeat=r.last_seen,
|
||||
)
|
||||
for r in self._agents.values()
|
||||
]
|
||||
|
||||
async def poll_agent(self, node_id: str) -> bool:
|
||||
record = self._agents.get(node_id)
|
||||
if record is None:
|
||||
return False
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client:
|
||||
resp = await client.get(f"{record.agent_url}/gpu-info")
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
gpus = [
|
||||
GpuInfo(
|
||||
gpu_id=g["gpu_id"],
|
||||
name=g["name"],
|
||||
vram_total_mb=g["vram_total_mb"],
|
||||
vram_used_mb=g["vram_used_mb"],
|
||||
vram_free_mb=g["vram_free_mb"],
|
||||
)
|
||||
for g in data.get("gpus", [])
|
||||
]
|
||||
record.gpus = gpus
|
||||
record.last_seen = time.time()
|
||||
record.online = True
|
||||
for gpu in gpus:
|
||||
self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb)
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning("Agent %s unreachable: %s", node_id, exc)
|
||||
record.online = False
|
||||
return False
|
||||
|
||||
async def poll_all(self) -> None:
|
||||
await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents])
|
||||
|
||||
async def run_heartbeat_loop(self) -> None:
|
||||
self._running = True
|
||||
while self._running:
|
||||
await self.poll_all()
|
||||
await asyncio.sleep(_HEARTBEAT_INTERVAL_S)
|
||||
|
||||
def stop(self) -> None:
|
||||
self._running = False
|
||||
82
circuitforge_core/resources/coordinator/eviction_engine.py
Normal file
82
circuitforge_core/resources/coordinator/eviction_engine.py
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import httpx
|
||||
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.models import VRAMLease
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_EVICTION_TIMEOUT_S = 10.0
|
||||
|
||||
|
||||
class EvictionEngine:
|
||||
def __init__(
|
||||
self,
|
||||
lease_manager: LeaseManager,
|
||||
eviction_timeout_s: float = _DEFAULT_EVICTION_TIMEOUT_S,
|
||||
) -> None:
|
||||
self.lease_manager = lease_manager
|
||||
self._timeout = eviction_timeout_s
|
||||
|
||||
async def request_lease(
|
||||
self,
|
||||
node_id: str,
|
||||
gpu_id: int,
|
||||
mb: int,
|
||||
service: str,
|
||||
priority: int,
|
||||
agent_url: str,
|
||||
ttl_s: float = 0.0,
|
||||
) -> VRAMLease | None:
|
||||
# Fast path: enough free VRAM
|
||||
lease = await self.lease_manager.try_grant(
|
||||
node_id, gpu_id, mb, service, priority, ttl_s
|
||||
)
|
||||
if lease is not None:
|
||||
return lease
|
||||
|
||||
# Find eviction candidates
|
||||
candidates = self.lease_manager.get_eviction_candidates(
|
||||
node_id=node_id, gpu_id=gpu_id,
|
||||
needed_mb=mb, requester_priority=priority,
|
||||
)
|
||||
if not candidates:
|
||||
logger.info(
|
||||
"No eviction candidates for %s on %s:GPU%d (%dMB needed)",
|
||||
service, node_id, gpu_id, mb,
|
||||
)
|
||||
return None
|
||||
|
||||
# Evict candidates
|
||||
freed_mb = sum(c.mb_granted for c in candidates)
|
||||
logger.info(
|
||||
"Evicting %d lease(s) to free %dMB for %s",
|
||||
len(candidates), freed_mb, service,
|
||||
)
|
||||
for candidate in candidates:
|
||||
await self._evict_lease(candidate, agent_url)
|
||||
|
||||
# Wait for evictions to free up VRAM (poll with timeout)
|
||||
deadline = asyncio.get_event_loop().time() + self._timeout
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
lease = await self.lease_manager.try_grant(
|
||||
node_id, gpu_id, mb, service, priority, ttl_s
|
||||
)
|
||||
if lease is not None:
|
||||
return lease
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
logger.warning("Eviction timed out for %s after %.1fs", service, self._timeout)
|
||||
return None
|
||||
|
||||
async def _evict_lease(self, lease: VRAMLease, agent_url: str) -> None:
|
||||
"""Release lease accounting. Process-level eviction deferred to Plan B."""
|
||||
await self.lease_manager.release(lease.lease_id)
|
||||
|
||||
async def _call_agent_evict(self, agent_url: str, lease: VRAMLease) -> bool:
|
||||
"""POST /evict to the agent. Stub for v1 — real process lookup in Plan B."""
|
||||
return True
|
||||
67
tests/test_resources/test_eviction_engine.py
Normal file
67
tests/test_resources/test_eviction_engine.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
import asyncio
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lease_manager():
|
||||
mgr = LeaseManager()
|
||||
mgr.register_gpu("heimdall", 0, 8192)
|
||||
return mgr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def engine(lease_manager):
|
||||
return EvictionEngine(lease_manager=lease_manager, eviction_timeout_s=0.1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_lease_grants_when_vram_available(engine, lease_manager):
|
||||
lease = await engine.request_lease(
|
||||
node_id="heimdall", gpu_id=0, mb=4096,
|
||||
service="peregrine", priority=1,
|
||||
agent_url="http://localhost:7701",
|
||||
)
|
||||
assert lease is not None
|
||||
assert lease.mb_granted == 4096
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_lease_evicts_and_grants(engine, lease_manager):
|
||||
# Pre-fill with a low-priority lease
|
||||
big_lease = await lease_manager.try_grant(
|
||||
"heimdall", 0, 7000, "comfyui", priority=4
|
||||
)
|
||||
assert big_lease is not None
|
||||
|
||||
# Mock the agent eviction call
|
||||
with patch(
|
||||
"circuitforge_core.resources.coordinator.eviction_engine.EvictionEngine._call_agent_evict",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_evict:
|
||||
mock_evict.return_value = True
|
||||
# Simulate the comfyui lease being released (as if the agent evicted it)
|
||||
asyncio.get_event_loop().call_later(
|
||||
0.05, lambda: asyncio.ensure_future(lease_manager.release(big_lease.lease_id))
|
||||
)
|
||||
lease = await engine.request_lease(
|
||||
node_id="heimdall", gpu_id=0, mb=4096,
|
||||
service="peregrine", priority=1,
|
||||
agent_url="http://localhost:7701",
|
||||
)
|
||||
assert lease is not None
|
||||
assert lease.holder_service == "peregrine"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_lease_returns_none_when_no_eviction_candidates(engine):
|
||||
await engine.lease_manager.try_grant("heimdall", 0, 6000, "vllm", priority=1)
|
||||
# Requesting 4GB but no lower-priority leases exist
|
||||
lease = await engine.request_lease(
|
||||
node_id="heimdall", gpu_id=0, mb=4096,
|
||||
service="kiwi", priority=2,
|
||||
agent_url="http://localhost:7701",
|
||||
)
|
||||
assert lease is None
|
||||
Loading…
Reference in a new issue