feat(resources): add LeaseManager with VRAM tracking and eviction candidate selection
This commit is contained in:
parent
cdd8072b32
commit
d60503f059
3 changed files with 174 additions and 0 deletions
88
circuitforge_core/resources/coordinator/lease_manager.py
Normal file
88
circuitforge_core/resources/coordinator/lease_manager.py
Normal file
|
|
@ -0,0 +1,88 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from circuitforge_core.resources.models import VRAMLease
|
||||||
|
|
||||||
|
|
||||||
|
class LeaseManager:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._leases: dict[str, VRAMLease] = {}
|
||||||
|
self._gpu_total: dict[tuple[str, int], int] = {}
|
||||||
|
self._gpu_used: dict[tuple[str, int], int] = defaultdict(int)
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
|
||||||
|
def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None:
|
||||||
|
self._gpu_total[(node_id, gpu_id)] = total_mb
|
||||||
|
|
||||||
|
def gpu_total_mb(self, node_id: str, gpu_id: int) -> int:
|
||||||
|
return self._gpu_total.get((node_id, gpu_id), 0)
|
||||||
|
|
||||||
|
def used_mb(self, node_id: str, gpu_id: int) -> int:
|
||||||
|
return self._gpu_used[(node_id, gpu_id)]
|
||||||
|
|
||||||
|
async def try_grant(
|
||||||
|
self,
|
||||||
|
node_id: str,
|
||||||
|
gpu_id: int,
|
||||||
|
mb: int,
|
||||||
|
service: str,
|
||||||
|
priority: int,
|
||||||
|
ttl_s: float = 0.0,
|
||||||
|
) -> VRAMLease | None:
|
||||||
|
async with self._lock:
|
||||||
|
total = self._gpu_total.get((node_id, gpu_id), 0)
|
||||||
|
used = self._gpu_used[(node_id, gpu_id)]
|
||||||
|
if total - used < mb:
|
||||||
|
return None
|
||||||
|
lease = VRAMLease.create(
|
||||||
|
gpu_id=gpu_id, node_id=node_id, mb=mb,
|
||||||
|
service=service, priority=priority, ttl_s=ttl_s,
|
||||||
|
)
|
||||||
|
self._leases[lease.lease_id] = lease
|
||||||
|
self._gpu_used[(node_id, gpu_id)] += mb
|
||||||
|
return lease
|
||||||
|
|
||||||
|
async def release(self, lease_id: str) -> bool:
|
||||||
|
async with self._lock:
|
||||||
|
lease = self._leases.pop(lease_id, None)
|
||||||
|
if lease is None:
|
||||||
|
return False
|
||||||
|
self._gpu_used[(lease.node_id, lease.gpu_id)] -= lease.mb_granted
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_eviction_candidates(
|
||||||
|
self,
|
||||||
|
node_id: str,
|
||||||
|
gpu_id: int,
|
||||||
|
needed_mb: int,
|
||||||
|
requester_priority: int,
|
||||||
|
) -> list[VRAMLease]:
|
||||||
|
candidates = [
|
||||||
|
lease for lease in self._leases.values()
|
||||||
|
if lease.node_id == node_id
|
||||||
|
and lease.gpu_id == gpu_id
|
||||||
|
and lease.priority > requester_priority
|
||||||
|
]
|
||||||
|
candidates.sort(key=lambda l: l.priority, reverse=True)
|
||||||
|
selected: list[VRAMLease] = []
|
||||||
|
freed = 0
|
||||||
|
for candidate in candidates:
|
||||||
|
selected.append(candidate)
|
||||||
|
freed += candidate.mb_granted
|
||||||
|
if freed >= needed_mb:
|
||||||
|
break
|
||||||
|
return selected
|
||||||
|
|
||||||
|
def list_leases(
|
||||||
|
self, node_id: str | None = None, gpu_id: int | None = None
|
||||||
|
) -> list[VRAMLease]:
|
||||||
|
return [
|
||||||
|
lease for lease in self._leases.values()
|
||||||
|
if (node_id is None or lease.node_id == node_id)
|
||||||
|
and (gpu_id is None or lease.gpu_id == gpu_id)
|
||||||
|
]
|
||||||
|
|
||||||
|
def all_leases(self) -> list[VRAMLease]:
|
||||||
|
return list(self._leases.values())
|
||||||
|
|
@ -19,3 +19,4 @@ include = ["circuitforge_core*"]
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
testpaths = ["tests"]
|
testpaths = ["tests"]
|
||||||
|
asyncio_mode = "auto"
|
||||||
|
|
|
||||||
85
tests/test_resources/test_lease_manager.py
Normal file
85
tests/test_resources/test_lease_manager.py
Normal file
|
|
@ -0,0 +1,85 @@
|
||||||
|
import asyncio
|
||||||
|
import pytest
|
||||||
|
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mgr():
|
||||||
|
m = LeaseManager()
|
||||||
|
m.register_gpu(node_id="heimdall", gpu_id=0, total_mb=8192)
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_grant_succeeds_when_vram_available(mgr):
|
||||||
|
lease = await mgr.try_grant(
|
||||||
|
node_id="heimdall", gpu_id=0, mb=4096,
|
||||||
|
service="peregrine", priority=1
|
||||||
|
)
|
||||||
|
assert lease is not None
|
||||||
|
assert lease.mb_granted == 4096
|
||||||
|
assert lease.node_id == "heimdall"
|
||||||
|
assert lease.gpu_id == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_grant_fails_when_vram_insufficient(mgr):
|
||||||
|
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
|
||||||
|
service="vllm", priority=1)
|
||||||
|
lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000,
|
||||||
|
service="kiwi", priority=2)
|
||||||
|
assert lease is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_release_frees_vram(mgr):
|
||||||
|
lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
|
||||||
|
service="vllm", priority=1)
|
||||||
|
assert lease is not None
|
||||||
|
released = await mgr.release(lease.lease_id)
|
||||||
|
assert released is True
|
||||||
|
lease2 = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
|
||||||
|
service="comfyui", priority=4)
|
||||||
|
assert lease2 is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_release_unknown_lease_returns_false(mgr):
|
||||||
|
result = await mgr.release("nonexistent-id")
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_eviction_candidates_returns_lower_priority_leases(mgr):
|
||||||
|
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=3000,
|
||||||
|
service="comfyui", priority=4)
|
||||||
|
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000,
|
||||||
|
service="ollama", priority=1)
|
||||||
|
candidates = mgr.get_eviction_candidates(
|
||||||
|
node_id="heimdall", gpu_id=0,
|
||||||
|
needed_mb=3000, requester_priority=2
|
||||||
|
)
|
||||||
|
assert len(candidates) == 1
|
||||||
|
assert candidates[0].holder_service == "comfyui"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_leases_for_gpu(mgr):
|
||||||
|
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=1024,
|
||||||
|
service="peregrine", priority=1)
|
||||||
|
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=512,
|
||||||
|
service="kiwi", priority=2)
|
||||||
|
leases = mgr.list_leases(node_id="heimdall", gpu_id=0)
|
||||||
|
assert len(leases) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_gpu_sets_total(mgr):
|
||||||
|
assert mgr.gpu_total_mb("heimdall", 0) == 8192
|
||||||
|
|
||||||
|
|
||||||
|
def test_used_mb_tracks_grants():
|
||||||
|
mgr = LeaseManager()
|
||||||
|
mgr.register_gpu("heimdall", 0, 8192)
|
||||||
|
asyncio.run(mgr.try_grant("heimdall", 0, 3000, "a", 1))
|
||||||
|
asyncio.run(mgr.try_grant("heimdall", 0, 2000, "b", 2))
|
||||||
|
assert mgr.used_mb("heimdall", 0) == 5000
|
||||||
Loading…
Reference in a new issue