feat(orch): expose online_agents() and resident_keys() helpers

This commit is contained in:
pyr0ball 2026-04-02 11:22:29 -07:00
parent d600fb6651
commit 52d2c5cf38
3 changed files with 86 additions and 5 deletions

View file

@ -8,7 +8,7 @@ from dataclasses import dataclass, field
import httpx
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.models import GpuInfo, NodeInfo
from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation
logger = logging.getLogger(__name__)
@ -58,15 +58,27 @@ class AgentSupervisor:
for r in self._agents.values()
]
def online_agents(self) -> "dict[str, AgentRecord]":
"""Return only currently-online agents, keyed by node_id."""
return {nid: rec for nid, rec in self._agents.items() if rec.online}
async def poll_agent(self, node_id: str) -> bool:
record = self._agents.get(node_id)
if record is None:
return False
try:
async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client:
resp = await client.get(f"{record.agent_url}/gpu-info")
resp.raise_for_status()
data = resp.json()
gpu_resp = await client.get(f"{record.agent_url}/gpu-info")
gpu_resp.raise_for_status()
# Resident-info is best-effort — older agents may not have the endpoint.
try:
res_resp = await client.get(f"{record.agent_url}/resident-info")
resident_data = res_resp.json() if res_resp.is_success else {}
except Exception:
resident_data = {}
data = gpu_resp.json()
gpus = [
GpuInfo(
gpu_id=g["gpu_id"],
@ -82,6 +94,13 @@ class AgentSupervisor:
record.online = True
for gpu in gpus:
self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb)
residents = [
(r["service"], r.get("model_name"))
for r in resident_data.get("residents", [])
]
self._lease_manager.set_residents_for_node(node_id, residents)
return True
except Exception as exc:
logger.warning("Agent %s unreachable: %s", node_id, exc)

View file

@ -3,7 +3,7 @@ from __future__ import annotations
import asyncio
from collections import defaultdict
from circuitforge_core.resources.models import VRAMLease
from circuitforge_core.resources.models import ResidentAllocation, VRAMLease
class LeaseManager:
@ -12,6 +12,9 @@ class LeaseManager:
self._gpu_total: dict[tuple[str, int], int] = {}
self._gpu_used: dict[tuple[str, int], int] = defaultdict(int)
self._lock = asyncio.Lock()
# Resident allocations — keyed "node_id:service", updated by heartbeat.
# No lock needed: only the single heartbeat task writes this dict.
self._residents: dict[str, ResidentAllocation] = {}
def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None:
self._gpu_total[(node_id, gpu_id)] = total_mb
@ -86,3 +89,42 @@ class LeaseManager:
def all_leases(self) -> list[VRAMLease]:
return list(self._leases.values())
# ── resident tracking ────────────────────────────────────────────
def set_residents_for_node(
self,
node_id: str,
residents: list[tuple[str, str | None]], # (service, model_name)
) -> None:
"""
Replace the resident snapshot for a node.
Preserves first_seen for entries whose service+model_name are unchanged,
so the dashboard can show how long a model has been warm.
"""
new_keys = {f"{node_id}:{service}" for service, _ in residents}
# Remove stale entries (service no longer running on this node).
for key in list(self._residents):
if key.startswith(f"{node_id}:") and key not in new_keys:
del self._residents[key]
# Upsert: preserve first_seen when model is unchanged, reset otherwise.
for service, model_name in residents:
key = f"{node_id}:{service}"
existing = self._residents.get(key)
if existing is not None and existing.model_name == model_name:
continue # same model still loaded — keep original first_seen
self._residents[key] = ResidentAllocation(
service=service,
node_id=node_id,
model_name=model_name,
)
def all_residents(self) -> list[ResidentAllocation]:
return list(self._residents.values())
def resident_keys(self) -> set[str]:
"""Return set of 'node_id:service' strings for currently-warm services."""
return set(self._residents.keys())

View file

@ -2,6 +2,7 @@ import pytest
from unittest.mock import MagicMock
from fastapi.testclient import TestClient
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo
@ -112,3 +113,22 @@ def test_dashboard_serves_html(coordinator_client):
assert "cf-orch" in resp.text
assert "/api/nodes" in resp.text
assert "/api/leases" in resp.text
def test_online_agents_excludes_offline():
lm = LeaseManager()
sup = AgentSupervisor(lm)
sup.register("online_node", "http://a:7701")
sup.register("offline_node", "http://b:7701")
sup._agents["online_node"].online = True
sup._agents["offline_node"].online = False
result = sup.online_agents()
assert "online_node" in result
assert "offline_node" not in result
def test_resident_keys_returns_set_of_node_service():
lm = LeaseManager()
lm.set_residents_for_node("heimdall", [("vllm", "Ouro-1.4B"), ("ollama", None)])
keys = lm.resident_keys()
assert keys == {"heimdall:vllm", "heimdall:ollama"}