From c299482e0df8090a3370d5c2c7ce8d943f2f21ad Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:30:28 -0700 Subject: [PATCH] feat: add idle sweep to AgentSupervisor --- circuitforge_core/resources/cli.py | 26 +++++- .../resources/coordinator/agent_supervisor.py | 53 ++++++++++- tests/test_resources/test_agent_supervisor.py | 93 +++++++++++++++++++ 3 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 tests/test_resources/test_agent_supervisor.py diff --git a/circuitforge_core/resources/cli.py b/circuitforge_core/resources/cli.py index 9e65f08..dc6883f 100644 --- a/circuitforge_core/resources/cli.py +++ b/circuitforge_core/resources/cli.py @@ -44,11 +44,17 @@ def start( from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.app import create_coordinator_app + from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor lease_manager = LeaseManager() profile_registry = ProfileRegistry() - supervisor = AgentSupervisor(lease_manager=lease_manager) + service_registry = ServiceRegistry() + supervisor = AgentSupervisor( + lease_manager=lease_manager, + service_registry=service_registry, + profile_registry=profile_registry, + ) monitor = GpuMonitor() gpus = monitor.poll() @@ -79,6 +85,7 @@ def start( lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=supervisor, + service_registry=service_registry, ) typer.echo(f"Starting cf-orch coordinator on {host}:{port}") @@ -92,6 +99,7 @@ def agent( host: str = "0.0.0.0", port: int = 7701, advertise_host: Optional[str] = None, + profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None, ) -> None: """Start a cf-orch node agent and self-register with the coordinator. @@ -101,10 +109,11 @@ def agent( Use --advertise-host to override the IP the coordinator should use to reach this agent (e.g. on a multi-homed or NATted host). """ - import asyncio import threading import httpx from circuitforge_core.resources.agent.app import create_agent_app + from circuitforge_core.resources.agent.service_manager import ServiceManager + from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry # The URL the coordinator should use to reach this agent. reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host) @@ -132,7 +141,18 @@ def agent( # Fire registration in a daemon thread so uvicorn.run() can start blocking immediately. threading.Thread(target=_register_in_background, daemon=True).start() - agent_app = create_agent_app(node_id=node_id) + service_manager = None + try: + from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor + pr = ProfileRegistry() + gpus = GpuMonitor().poll() + p = pr.load(Path(profile)) if profile else pr.auto_detect(gpus) + service_manager = ServiceManager(node_id=node_id, profile=p, advertise_host=reach_host) + typer.echo(f"ServiceManager ready with profile: {p.name}") + except Exception as exc: + typer.echo(f"Warning: ServiceManager unavailable ({exc})", err=True) + + agent_app = create_agent_app(node_id=node_id, service_manager=service_manager) typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}") uvicorn.run(agent_app, host=host, port=port) diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index 5eb4f15..6db65ac 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -8,6 +8,8 @@ from dataclasses import dataclass, field import httpx from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation logger = logging.getLogger(__name__) @@ -26,10 +28,18 @@ class AgentRecord: class AgentSupervisor: - def __init__(self, lease_manager: LeaseManager) -> None: + def __init__( + self, + lease_manager: LeaseManager, + service_registry: ServiceRegistry | None = None, + profile_registry: ProfileRegistry | None = None, + ) -> None: self._agents: dict[str, AgentRecord] = {} self._lease_manager = lease_manager self._running = False + self._service_registry = service_registry + self._profile_registry = profile_registry + self._heartbeat_tick = 0 def register(self, node_id: str, agent_url: str) -> None: if node_id not in self._agents: @@ -110,10 +120,51 @@ class AgentSupervisor: async def poll_all(self) -> None: await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents]) + def _build_idle_stop_config(self) -> dict[str, int]: + if self._profile_registry is None: + return {} + config: dict[str, int] = {} + for profile in self._profile_registry.list_public(): + for svc_name, svc in profile.services.items(): + if svc.idle_stop_after_s > 0: + existing = config.get(svc_name, 0) + config[svc_name] = min(existing, svc.idle_stop_after_s) if existing > 0 else svc.idle_stop_after_s + return config + + async def _http_post(self, url: str) -> bool: + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post(url) + return resp.is_success + except Exception as exc: + logger.warning("HTTP POST %s failed: %s", url, exc) + return False + + async def _run_idle_sweep(self) -> None: + if self._service_registry is None: + return + idle_stop_config = self._build_idle_stop_config() + if not idle_stop_config: + return + timed_out = self._service_registry.idle_past_timeout(idle_stop_config) + for instance in timed_out: + node_info = self.get_node_info(instance.node_id) + if node_info is None: + continue + stop_url = f"{node_info.agent_url}/services/{instance.service}/stop" + logger.info( + "Idle sweep: stopping %s on %s gpu%s (idle timeout)", + instance.service, instance.node_id, instance.gpu_id, + ) + await self._http_post(stop_url) + async def run_heartbeat_loop(self) -> None: self._running = True while self._running: await self.poll_all() + self._heartbeat_tick += 1 + if self._heartbeat_tick % 3 == 0: + await self._run_idle_sweep() await asyncio.sleep(_HEARTBEAT_INTERVAL_S) def stop(self) -> None: diff --git a/tests/test_resources/test_agent_supervisor.py b/tests/test_resources/test_agent_supervisor.py new file mode 100644 index 0000000..f669b62 --- /dev/null +++ b/tests/test_resources/test_agent_supervisor.py @@ -0,0 +1,93 @@ +import asyncio +import time +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry, ServiceInstance + + +def test_build_idle_stop_config_empty_without_registry(): + lm = LeaseManager() + supervisor = AgentSupervisor(lease_manager=lm) + assert supervisor._build_idle_stop_config() == {} + + +def test_build_idle_stop_config_from_profiles(): + lm = LeaseManager() + mock_svc = MagicMock() + mock_svc.idle_stop_after_s = 600 + mock_profile = MagicMock() + mock_profile.services = {"vllm": mock_svc} + mock_profile_registry = MagicMock() + mock_profile_registry.list_public.return_value = [mock_profile] + + supervisor = AgentSupervisor(lease_manager=lm, profile_registry=mock_profile_registry) + config = supervisor._build_idle_stop_config() + assert config == {"vllm": 600} + + +@pytest.mark.asyncio +async def test_run_idle_sweep_posts_stop(): + lm = LeaseManager() + service_registry = ServiceRegistry() + + # Upsert instance as running, then allocate + release to transition it to idle + service_registry.upsert_instance( + service="vllm", + node_id="heimdall", + gpu_id=0, + state="running", + model="test-model", + url="http://heimdall:8000", + ) + alloc = service_registry.allocate( + service="vllm", + node_id="heimdall", + gpu_id=0, + model="test-model", + url="http://heimdall:8000", + caller="test", + ttl_s=300.0, + ) + service_registry.release(alloc.allocation_id) + + # Backdate idle_since so it exceeds the timeout + import dataclasses + key = "vllm:heimdall:0" + inst = service_registry._instances[key] + service_registry._instances[key] = dataclasses.replace(inst, idle_since=time.time() - 700) + + mock_profile_registry = MagicMock() + mock_svc = MagicMock() + mock_svc.idle_stop_after_s = 600 + mock_profile = MagicMock() + mock_profile.services = {"vllm": mock_svc} + mock_profile_registry.list_public.return_value = [mock_profile] + + supervisor = AgentSupervisor( + lease_manager=lm, + service_registry=service_registry, + profile_registry=mock_profile_registry, + ) + supervisor.register("heimdall", "http://heimdall:7701") + + posted_urls = [] + + async def fake_http_post(url: str) -> bool: + posted_urls.append(url) + return True + + supervisor._http_post = fake_http_post + await supervisor._run_idle_sweep() + + assert len(posted_urls) == 1 + assert posted_urls[0] == "http://heimdall:7701/services/vllm/stop" + + +@pytest.mark.asyncio +async def test_run_idle_sweep_skips_without_registry(): + lm = LeaseManager() + supervisor = AgentSupervisor(lease_manager=lm) + # Should return immediately without error + await supervisor._run_idle_sweep()