feat(orch): add ServiceRegistry — allocation tracking + idle state machine

This commit is contained in:
pyr0ball 2026-04-02 12:22:46 -07:00
parent 17a24173f7
commit 9754f522d9
2 changed files with 203 additions and 0 deletions

View file

@ -0,0 +1,140 @@
from __future__ import annotations
import dataclasses
import time
import uuid
from dataclasses import dataclass
from typing import Literal
@dataclass
class ServiceAllocation:
allocation_id: str
service: str
node_id: str
gpu_id: int
model: str | None
caller: str
url: str
created_at: float
expires_at: float # 0 = no expiry
@dataclass
class ServiceInstance:
service: str
node_id: str
gpu_id: int
state: Literal["starting", "running", "idle", "stopped"]
model: str | None
url: str | None
idle_since: float | None = None
class ServiceRegistry:
"""
In-memory registry of service allocations and instance state.
Allocations: per-caller request many per service instance.
Instances: per (service, node_id, gpu_id) one per running container.
"""
def __init__(self) -> None:
self._allocations: dict[str, ServiceAllocation] = {}
self._instances: dict[str, ServiceInstance] = {} # key: "service:node_id:gpu_id"
# ── allocation API ────────────────────────────────────────────────
def allocate(
self,
service: str,
node_id: str,
gpu_id: int,
model: str | None,
url: str,
caller: str,
ttl_s: float,
) -> ServiceAllocation:
alloc = ServiceAllocation(
allocation_id=str(uuid.uuid4()),
service=service,
node_id=node_id,
gpu_id=gpu_id,
model=model,
caller=caller,
url=url,
created_at=time.time(),
expires_at=time.time() + ttl_s if ttl_s > 0 else 0.0,
)
self._allocations[alloc.allocation_id] = alloc
# If an instance exists in idle/stopped state, mark it running again
key = f"{service}:{node_id}:{gpu_id}"
if key in self._instances:
inst = self._instances[key]
if inst.state in ("idle", "stopped"):
self._instances[key] = dataclasses.replace(
inst, state="running", idle_since=None
)
return alloc
def release(self, allocation_id: str) -> bool:
alloc = self._allocations.pop(allocation_id, None)
if alloc is None:
return False
# If no active allocations remain for this instance, mark it idle
key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}"
if self.active_allocations(alloc.service, alloc.node_id) == 0:
if key in self._instances:
self._instances[key] = dataclasses.replace(
self._instances[key], state="idle", idle_since=time.time()
)
return True
def active_allocations(self, service: str, node_id: str) -> int:
return sum(
1 for a in self._allocations.values()
if a.service == service and a.node_id == node_id
)
# ── instance API ─────────────────────────────────────────────────
def upsert_instance(
self,
service: str,
node_id: str,
gpu_id: int,
state: Literal["starting", "running", "idle", "stopped"],
model: str | None,
url: str | None,
) -> ServiceInstance:
key = f"{service}:{node_id}:{gpu_id}"
existing = self._instances.get(key)
idle_since: float | None = None
if state == "idle":
# Preserve idle_since if already idle; set now if transitioning into idle
idle_since = existing.idle_since if (existing and existing.state == "idle") else time.time()
inst = ServiceInstance(
service=service, node_id=node_id, gpu_id=gpu_id,
state=state, model=model, url=url, idle_since=idle_since,
)
self._instances[key] = inst
return inst
def all_instances(self) -> list[ServiceInstance]:
return list(self._instances.values())
def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]:
"""
Return instances in 'idle' state whose idle time exceeds their configured timeout.
idle_stop_config: {service_name: seconds} 0 means never stop automatically.
"""
now = time.time()
result = []
for inst in self._instances.values():
if inst.state != "idle" or inst.idle_since is None:
continue
timeout = idle_stop_config.get(inst.service, 0)
if timeout > 0 and (now - inst.idle_since) >= timeout:
result.append(inst)
return result

View file

@ -0,0 +1,63 @@
import time
import dataclasses
import pytest
from circuitforge_core.resources.coordinator.service_registry import (
ServiceRegistry, ServiceAllocation, ServiceInstance,
)
@pytest.fixture
def registry():
return ServiceRegistry()
def test_allocate_creates_allocation(registry):
alloc = registry.allocate(
service="vllm", node_id="heimdall", gpu_id=0,
model="Ouro-1.4B", url="http://heimdall:8000",
caller="test", ttl_s=300.0,
)
assert alloc.service == "vllm"
assert alloc.node_id == "heimdall"
assert alloc.allocation_id # non-empty UUID string
def test_active_allocations_count(registry):
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0)
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0)
assert registry.active_allocations("vllm", "heimdall") == 2
def test_release_decrements_count(registry):
alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0)
registry.release(alloc.allocation_id)
assert registry.active_allocations("vllm", "heimdall") == 0
def test_release_nonexistent_returns_false(registry):
assert registry.release("nonexistent-id") is False
def test_upsert_instance_sets_running_state(registry):
registry.upsert_instance("vllm", "heimdall", 0, state="running",
model="Ouro-1.4B", url="http://heimdall:8000")
instances = registry.all_instances()
assert len(instances) == 1
assert instances[0].state == "running"
def test_release_last_alloc_marks_instance_idle(registry):
registry.upsert_instance("vllm", "heimdall", 0, state="running",
model="Ouro-1.4B", url="http://heimdall:8000")
alloc = registry.allocate("vllm", "heimdall", 0, "Ouro-1.4B", "http://heimdall:8000", "a", 300.0)
registry.release(alloc.allocation_id)
instance = registry.all_instances()[0]
assert instance.state == "idle"
assert instance.idle_since is not None
def test_new_alloc_on_idle_instance_marks_it_running(registry):
registry.upsert_instance("vllm", "heimdall", 0, state="idle",
model="M", url="http://h:8000")
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0)
assert registry.all_instances()[0].state == "running"