diff --git a/circuitforge_core/resources/coordinator/lease_manager.py b/circuitforge_core/resources/coordinator/lease_manager.py new file mode 100644 index 0000000..f16259a --- /dev/null +++ b/circuitforge_core/resources/coordinator/lease_manager.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import asyncio +from collections import defaultdict + +from circuitforge_core.resources.models import VRAMLease + + +class LeaseManager: + def __init__(self) -> None: + self._leases: dict[str, VRAMLease] = {} + self._gpu_total: dict[tuple[str, int], int] = {} + self._gpu_used: dict[tuple[str, int], int] = defaultdict(int) + self._lock = asyncio.Lock() + + def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None: + self._gpu_total[(node_id, gpu_id)] = total_mb + + def gpu_total_mb(self, node_id: str, gpu_id: int) -> int: + return self._gpu_total.get((node_id, gpu_id), 0) + + def used_mb(self, node_id: str, gpu_id: int) -> int: + return self._gpu_used[(node_id, gpu_id)] + + async def try_grant( + self, + node_id: str, + gpu_id: int, + mb: int, + service: str, + priority: int, + ttl_s: float = 0.0, + ) -> VRAMLease | None: + async with self._lock: + total = self._gpu_total.get((node_id, gpu_id), 0) + used = self._gpu_used[(node_id, gpu_id)] + if total - used < mb: + return None + lease = VRAMLease.create( + gpu_id=gpu_id, node_id=node_id, mb=mb, + service=service, priority=priority, ttl_s=ttl_s, + ) + self._leases[lease.lease_id] = lease + self._gpu_used[(node_id, gpu_id)] += mb + return lease + + async def release(self, lease_id: str) -> bool: + async with self._lock: + lease = self._leases.pop(lease_id, None) + if lease is None: + return False + self._gpu_used[(lease.node_id, lease.gpu_id)] -= lease.mb_granted + return True + + def get_eviction_candidates( + self, + node_id: str, + gpu_id: int, + needed_mb: int, + requester_priority: int, + ) -> list[VRAMLease]: + candidates = [ + lease for lease in self._leases.values() + if lease.node_id == node_id + and lease.gpu_id == gpu_id + and lease.priority > requester_priority + ] + candidates.sort(key=lambda l: l.priority, reverse=True) + selected: list[VRAMLease] = [] + freed = 0 + for candidate in candidates: + selected.append(candidate) + freed += candidate.mb_granted + if freed >= needed_mb: + break + return selected + + def list_leases( + self, node_id: str | None = None, gpu_id: int | None = None + ) -> list[VRAMLease]: + return [ + lease for lease in self._leases.values() + if (node_id is None or lease.node_id == node_id) + and (gpu_id is None or lease.gpu_id == gpu_id) + ] + + def all_leases(self) -> list[VRAMLease]: + return list(self._leases.values()) diff --git a/pyproject.toml b/pyproject.toml index 7215dcb..1ee3704 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,3 +19,4 @@ include = ["circuitforge_core*"] [tool.pytest.ini_options] testpaths = ["tests"] +asyncio_mode = "auto" diff --git a/tests/test_resources/test_lease_manager.py b/tests/test_resources/test_lease_manager.py new file mode 100644 index 0000000..0b2385c --- /dev/null +++ b/tests/test_resources/test_lease_manager.py @@ -0,0 +1,85 @@ +import asyncio +import pytest +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager + + +@pytest.fixture +def mgr(): + m = LeaseManager() + m.register_gpu(node_id="heimdall", gpu_id=0, total_mb=8192) + return m + + +@pytest.mark.asyncio +async def test_grant_succeeds_when_vram_available(mgr): + lease = await mgr.try_grant( + node_id="heimdall", gpu_id=0, mb=4096, + service="peregrine", priority=1 + ) + assert lease is not None + assert lease.mb_granted == 4096 + assert lease.node_id == "heimdall" + assert lease.gpu_id == 0 + + +@pytest.mark.asyncio +async def test_grant_fails_when_vram_insufficient(mgr): + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000, + service="vllm", priority=1) + lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000, + service="kiwi", priority=2) + assert lease is None + + +@pytest.mark.asyncio +async def test_release_frees_vram(mgr): + lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000, + service="vllm", priority=1) + assert lease is not None + released = await mgr.release(lease.lease_id) + assert released is True + lease2 = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000, + service="comfyui", priority=4) + assert lease2 is not None + + +@pytest.mark.asyncio +async def test_release_unknown_lease_returns_false(mgr): + result = await mgr.release("nonexistent-id") + assert result is False + + +@pytest.mark.asyncio +async def test_get_eviction_candidates_returns_lower_priority_leases(mgr): + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=3000, + service="comfyui", priority=4) + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000, + service="ollama", priority=1) + candidates = mgr.get_eviction_candidates( + node_id="heimdall", gpu_id=0, + needed_mb=3000, requester_priority=2 + ) + assert len(candidates) == 1 + assert candidates[0].holder_service == "comfyui" + + +@pytest.mark.asyncio +async def test_list_leases_for_gpu(mgr): + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=1024, + service="peregrine", priority=1) + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=512, + service="kiwi", priority=2) + leases = mgr.list_leases(node_id="heimdall", gpu_id=0) + assert len(leases) == 2 + + +def test_register_gpu_sets_total(mgr): + assert mgr.gpu_total_mb("heimdall", 0) == 8192 + + +def test_used_mb_tracks_grants(): + mgr = LeaseManager() + mgr.register_gpu("heimdall", 0, 8192) + asyncio.run(mgr.try_grant("heimdall", 0, 3000, "a", 1)) + asyncio.run(mgr.try_grant("heimdall", 0, 2000, "b", 2)) + assert mgr.used_mb("heimdall", 0) == 5000