From 9754f522d98ea5493140fa0c065d36147a9cdcf6 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:22:46 -0700 Subject: [PATCH] =?UTF-8?q?feat(orch):=20add=20ServiceRegistry=20=E2=80=94?= =?UTF-8?q?=20allocation=20tracking=20+=20idle=20state=20machine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/coordinator/service_registry.py | 140 ++++++++++++++++++ tests/test_resources/test_service_registry.py | 63 ++++++++ 2 files changed, 203 insertions(+) create mode 100644 circuitforge_core/resources/coordinator/service_registry.py create mode 100644 tests/test_resources/test_service_registry.py diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py new file mode 100644 index 0000000..3d0df1b --- /dev/null +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +import dataclasses +import time +import uuid +from dataclasses import dataclass +from typing import Literal + + +@dataclass +class ServiceAllocation: + allocation_id: str + service: str + node_id: str + gpu_id: int + model: str | None + caller: str + url: str + created_at: float + expires_at: float # 0 = no expiry + + +@dataclass +class ServiceInstance: + service: str + node_id: str + gpu_id: int + state: Literal["starting", "running", "idle", "stopped"] + model: str | None + url: str | None + idle_since: float | None = None + + +class ServiceRegistry: + """ + In-memory registry of service allocations and instance state. + + Allocations: per-caller request — many per service instance. + Instances: per (service, node_id, gpu_id) — one per running container. + """ + + def __init__(self) -> None: + self._allocations: dict[str, ServiceAllocation] = {} + self._instances: dict[str, ServiceInstance] = {} # key: "service:node_id:gpu_id" + + # ── allocation API ──────────────────────────────────────────────── + + def allocate( + self, + service: str, + node_id: str, + gpu_id: int, + model: str | None, + url: str, + caller: str, + ttl_s: float, + ) -> ServiceAllocation: + alloc = ServiceAllocation( + allocation_id=str(uuid.uuid4()), + service=service, + node_id=node_id, + gpu_id=gpu_id, + model=model, + caller=caller, + url=url, + created_at=time.time(), + expires_at=time.time() + ttl_s if ttl_s > 0 else 0.0, + ) + self._allocations[alloc.allocation_id] = alloc + + # If an instance exists in idle/stopped state, mark it running again + key = f"{service}:{node_id}:{gpu_id}" + if key in self._instances: + inst = self._instances[key] + if inst.state in ("idle", "stopped"): + self._instances[key] = dataclasses.replace( + inst, state="running", idle_since=None + ) + return alloc + + def release(self, allocation_id: str) -> bool: + alloc = self._allocations.pop(allocation_id, None) + if alloc is None: + return False + # If no active allocations remain for this instance, mark it idle + key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}" + if self.active_allocations(alloc.service, alloc.node_id) == 0: + if key in self._instances: + self._instances[key] = dataclasses.replace( + self._instances[key], state="idle", idle_since=time.time() + ) + return True + + def active_allocations(self, service: str, node_id: str) -> int: + return sum( + 1 for a in self._allocations.values() + if a.service == service and a.node_id == node_id + ) + + # ── instance API ───────────────────────────────────────────────── + + def upsert_instance( + self, + service: str, + node_id: str, + gpu_id: int, + state: Literal["starting", "running", "idle", "stopped"], + model: str | None, + url: str | None, + ) -> ServiceInstance: + key = f"{service}:{node_id}:{gpu_id}" + existing = self._instances.get(key) + idle_since: float | None = None + if state == "idle": + # Preserve idle_since if already idle; set now if transitioning into idle + idle_since = existing.idle_since if (existing and existing.state == "idle") else time.time() + inst = ServiceInstance( + service=service, node_id=node_id, gpu_id=gpu_id, + state=state, model=model, url=url, idle_since=idle_since, + ) + self._instances[key] = inst + return inst + + def all_instances(self) -> list[ServiceInstance]: + return list(self._instances.values()) + + def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]: + """ + Return instances in 'idle' state whose idle time exceeds their configured timeout. + idle_stop_config: {service_name: seconds} — 0 means never stop automatically. + """ + now = time.time() + result = [] + for inst in self._instances.values(): + if inst.state != "idle" or inst.idle_since is None: + continue + timeout = idle_stop_config.get(inst.service, 0) + if timeout > 0 and (now - inst.idle_since) >= timeout: + result.append(inst) + return result diff --git a/tests/test_resources/test_service_registry.py b/tests/test_resources/test_service_registry.py new file mode 100644 index 0000000..aefb34d --- /dev/null +++ b/tests/test_resources/test_service_registry.py @@ -0,0 +1,63 @@ +import time +import dataclasses +import pytest +from circuitforge_core.resources.coordinator.service_registry import ( + ServiceRegistry, ServiceAllocation, ServiceInstance, +) + + +@pytest.fixture +def registry(): + return ServiceRegistry() + + +def test_allocate_creates_allocation(registry): + alloc = registry.allocate( + service="vllm", node_id="heimdall", gpu_id=0, + model="Ouro-1.4B", url="http://heimdall:8000", + caller="test", ttl_s=300.0, + ) + assert alloc.service == "vllm" + assert alloc.node_id == "heimdall" + assert alloc.allocation_id # non-empty UUID string + + +def test_active_allocations_count(registry): + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0) + assert registry.active_allocations("vllm", "heimdall") == 2 + + +def test_release_decrements_count(registry): + alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) + registry.release(alloc.allocation_id) + assert registry.active_allocations("vllm", "heimdall") == 0 + + +def test_release_nonexistent_returns_false(registry): + assert registry.release("nonexistent-id") is False + + +def test_upsert_instance_sets_running_state(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="Ouro-1.4B", url="http://heimdall:8000") + instances = registry.all_instances() + assert len(instances) == 1 + assert instances[0].state == "running" + + +def test_release_last_alloc_marks_instance_idle(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="Ouro-1.4B", url="http://heimdall:8000") + alloc = registry.allocate("vllm", "heimdall", 0, "Ouro-1.4B", "http://heimdall:8000", "a", 300.0) + registry.release(alloc.allocation_id) + instance = registry.all_instances()[0] + assert instance.state == "idle" + assert instance.idle_since is not None + + +def test_new_alloc_on_idle_instance_marks_it_running(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="idle", + model="M", url="http://h:8000") + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0) + assert registry.all_instances()[0].state == "running"