feat: add idle sweep to AgentSupervisor
This commit is contained in:
parent
1e168ac636
commit
c299482e0d
3 changed files with 168 additions and 4 deletions
|
|
@ -44,11 +44,17 @@ def start(
|
|||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
|
||||
from circuitforge_core.resources.coordinator.app import create_coordinator_app
|
||||
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
|
||||
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
|
||||
|
||||
lease_manager = LeaseManager()
|
||||
profile_registry = ProfileRegistry()
|
||||
supervisor = AgentSupervisor(lease_manager=lease_manager)
|
||||
service_registry = ServiceRegistry()
|
||||
supervisor = AgentSupervisor(
|
||||
lease_manager=lease_manager,
|
||||
service_registry=service_registry,
|
||||
profile_registry=profile_registry,
|
||||
)
|
||||
|
||||
monitor = GpuMonitor()
|
||||
gpus = monitor.poll()
|
||||
|
|
@ -79,6 +85,7 @@ def start(
|
|||
lease_manager=lease_manager,
|
||||
profile_registry=profile_registry,
|
||||
agent_supervisor=supervisor,
|
||||
service_registry=service_registry,
|
||||
)
|
||||
|
||||
typer.echo(f"Starting cf-orch coordinator on {host}:{port}")
|
||||
|
|
@ -92,6 +99,7 @@ def agent(
|
|||
host: str = "0.0.0.0",
|
||||
port: int = 7701,
|
||||
advertise_host: Optional[str] = None,
|
||||
profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None,
|
||||
) -> None:
|
||||
"""Start a cf-orch node agent and self-register with the coordinator.
|
||||
|
||||
|
|
@ -101,10 +109,11 @@ def agent(
|
|||
Use --advertise-host to override the IP the coordinator should use to
|
||||
reach this agent (e.g. on a multi-homed or NATted host).
|
||||
"""
|
||||
import asyncio
|
||||
import threading
|
||||
import httpx
|
||||
from circuitforge_core.resources.agent.app import create_agent_app
|
||||
from circuitforge_core.resources.agent.service_manager import ServiceManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
|
||||
# The URL the coordinator should use to reach this agent.
|
||||
reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host)
|
||||
|
|
@ -132,7 +141,18 @@ def agent(
|
|||
# Fire registration in a daemon thread so uvicorn.run() can start blocking immediately.
|
||||
threading.Thread(target=_register_in_background, daemon=True).start()
|
||||
|
||||
agent_app = create_agent_app(node_id=node_id)
|
||||
service_manager = None
|
||||
try:
|
||||
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
|
||||
pr = ProfileRegistry()
|
||||
gpus = GpuMonitor().poll()
|
||||
p = pr.load(Path(profile)) if profile else pr.auto_detect(gpus)
|
||||
service_manager = ServiceManager(node_id=node_id, profile=p, advertise_host=reach_host)
|
||||
typer.echo(f"ServiceManager ready with profile: {p.name}")
|
||||
except Exception as exc:
|
||||
typer.echo(f"Warning: ServiceManager unavailable ({exc})", err=True)
|
||||
|
||||
agent_app = create_agent_app(node_id=node_id, service_manager=service_manager)
|
||||
typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}")
|
||||
uvicorn.run(agent_app, host=host, port=port)
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ from dataclasses import dataclass, field
|
|||
import httpx
|
||||
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
|
||||
from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -26,10 +28,18 @@ class AgentRecord:
|
|||
|
||||
|
||||
class AgentSupervisor:
|
||||
def __init__(self, lease_manager: LeaseManager) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
lease_manager: LeaseManager,
|
||||
service_registry: ServiceRegistry | None = None,
|
||||
profile_registry: ProfileRegistry | None = None,
|
||||
) -> None:
|
||||
self._agents: dict[str, AgentRecord] = {}
|
||||
self._lease_manager = lease_manager
|
||||
self._running = False
|
||||
self._service_registry = service_registry
|
||||
self._profile_registry = profile_registry
|
||||
self._heartbeat_tick = 0
|
||||
|
||||
def register(self, node_id: str, agent_url: str) -> None:
|
||||
if node_id not in self._agents:
|
||||
|
|
@ -110,10 +120,51 @@ class AgentSupervisor:
|
|||
async def poll_all(self) -> None:
|
||||
await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents])
|
||||
|
||||
def _build_idle_stop_config(self) -> dict[str, int]:
|
||||
if self._profile_registry is None:
|
||||
return {}
|
||||
config: dict[str, int] = {}
|
||||
for profile in self._profile_registry.list_public():
|
||||
for svc_name, svc in profile.services.items():
|
||||
if svc.idle_stop_after_s > 0:
|
||||
existing = config.get(svc_name, 0)
|
||||
config[svc_name] = min(existing, svc.idle_stop_after_s) if existing > 0 else svc.idle_stop_after_s
|
||||
return config
|
||||
|
||||
async def _http_post(self, url: str) -> bool:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.post(url)
|
||||
return resp.is_success
|
||||
except Exception as exc:
|
||||
logger.warning("HTTP POST %s failed: %s", url, exc)
|
||||
return False
|
||||
|
||||
async def _run_idle_sweep(self) -> None:
|
||||
if self._service_registry is None:
|
||||
return
|
||||
idle_stop_config = self._build_idle_stop_config()
|
||||
if not idle_stop_config:
|
||||
return
|
||||
timed_out = self._service_registry.idle_past_timeout(idle_stop_config)
|
||||
for instance in timed_out:
|
||||
node_info = self.get_node_info(instance.node_id)
|
||||
if node_info is None:
|
||||
continue
|
||||
stop_url = f"{node_info.agent_url}/services/{instance.service}/stop"
|
||||
logger.info(
|
||||
"Idle sweep: stopping %s on %s gpu%s (idle timeout)",
|
||||
instance.service, instance.node_id, instance.gpu_id,
|
||||
)
|
||||
await self._http_post(stop_url)
|
||||
|
||||
async def run_heartbeat_loop(self) -> None:
|
||||
self._running = True
|
||||
while self._running:
|
||||
await self.poll_all()
|
||||
self._heartbeat_tick += 1
|
||||
if self._heartbeat_tick % 3 == 0:
|
||||
await self._run_idle_sweep()
|
||||
await asyncio.sleep(_HEARTBEAT_INTERVAL_S)
|
||||
|
||||
def stop(self) -> None:
|
||||
|
|
|
|||
93
tests/test_resources/test_agent_supervisor.py
Normal file
93
tests/test_resources/test_agent_supervisor.py
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
import asyncio
|
||||
import time
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry, ServiceInstance
|
||||
|
||||
|
||||
def test_build_idle_stop_config_empty_without_registry():
|
||||
lm = LeaseManager()
|
||||
supervisor = AgentSupervisor(lease_manager=lm)
|
||||
assert supervisor._build_idle_stop_config() == {}
|
||||
|
||||
|
||||
def test_build_idle_stop_config_from_profiles():
|
||||
lm = LeaseManager()
|
||||
mock_svc = MagicMock()
|
||||
mock_svc.idle_stop_after_s = 600
|
||||
mock_profile = MagicMock()
|
||||
mock_profile.services = {"vllm": mock_svc}
|
||||
mock_profile_registry = MagicMock()
|
||||
mock_profile_registry.list_public.return_value = [mock_profile]
|
||||
|
||||
supervisor = AgentSupervisor(lease_manager=lm, profile_registry=mock_profile_registry)
|
||||
config = supervisor._build_idle_stop_config()
|
||||
assert config == {"vllm": 600}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_idle_sweep_posts_stop():
|
||||
lm = LeaseManager()
|
||||
service_registry = ServiceRegistry()
|
||||
|
||||
# Upsert instance as running, then allocate + release to transition it to idle
|
||||
service_registry.upsert_instance(
|
||||
service="vllm",
|
||||
node_id="heimdall",
|
||||
gpu_id=0,
|
||||
state="running",
|
||||
model="test-model",
|
||||
url="http://heimdall:8000",
|
||||
)
|
||||
alloc = service_registry.allocate(
|
||||
service="vllm",
|
||||
node_id="heimdall",
|
||||
gpu_id=0,
|
||||
model="test-model",
|
||||
url="http://heimdall:8000",
|
||||
caller="test",
|
||||
ttl_s=300.0,
|
||||
)
|
||||
service_registry.release(alloc.allocation_id)
|
||||
|
||||
# Backdate idle_since so it exceeds the timeout
|
||||
import dataclasses
|
||||
key = "vllm:heimdall:0"
|
||||
inst = service_registry._instances[key]
|
||||
service_registry._instances[key] = dataclasses.replace(inst, idle_since=time.time() - 700)
|
||||
|
||||
mock_profile_registry = MagicMock()
|
||||
mock_svc = MagicMock()
|
||||
mock_svc.idle_stop_after_s = 600
|
||||
mock_profile = MagicMock()
|
||||
mock_profile.services = {"vllm": mock_svc}
|
||||
mock_profile_registry.list_public.return_value = [mock_profile]
|
||||
|
||||
supervisor = AgentSupervisor(
|
||||
lease_manager=lm,
|
||||
service_registry=service_registry,
|
||||
profile_registry=mock_profile_registry,
|
||||
)
|
||||
supervisor.register("heimdall", "http://heimdall:7701")
|
||||
|
||||
posted_urls = []
|
||||
|
||||
async def fake_http_post(url: str) -> bool:
|
||||
posted_urls.append(url)
|
||||
return True
|
||||
|
||||
supervisor._http_post = fake_http_post
|
||||
await supervisor._run_idle_sweep()
|
||||
|
||||
assert len(posted_urls) == 1
|
||||
assert posted_urls[0] == "http://heimdall:7701/services/vllm/stop"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_idle_sweep_skips_without_registry():
|
||||
lm = LeaseManager()
|
||||
supervisor = AgentSupervisor(lease_manager=lm)
|
||||
# Should return immediately without error
|
||||
await supervisor._run_idle_sweep()
|
||||
Loading…
Reference in a new issue