diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index 63d92e7..5eb4f15 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -8,7 +8,7 @@ from dataclasses import dataclass, field import httpx from circuitforge_core.resources.coordinator.lease_manager import LeaseManager -from circuitforge_core.resources.models import GpuInfo, NodeInfo +from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation logger = logging.getLogger(__name__) @@ -58,15 +58,27 @@ class AgentSupervisor: for r in self._agents.values() ] + def online_agents(self) -> "dict[str, AgentRecord]": + """Return only currently-online agents, keyed by node_id.""" + return {nid: rec for nid, rec in self._agents.items() if rec.online} + async def poll_agent(self, node_id: str) -> bool: record = self._agents.get(node_id) if record is None: return False try: async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client: - resp = await client.get(f"{record.agent_url}/gpu-info") - resp.raise_for_status() - data = resp.json() + gpu_resp = await client.get(f"{record.agent_url}/gpu-info") + gpu_resp.raise_for_status() + + # Resident-info is best-effort — older agents may not have the endpoint. + try: + res_resp = await client.get(f"{record.agent_url}/resident-info") + resident_data = res_resp.json() if res_resp.is_success else {} + except Exception: + resident_data = {} + + data = gpu_resp.json() gpus = [ GpuInfo( gpu_id=g["gpu_id"], @@ -82,6 +94,13 @@ class AgentSupervisor: record.online = True for gpu in gpus: self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb) + + residents = [ + (r["service"], r.get("model_name")) + for r in resident_data.get("residents", []) + ] + self._lease_manager.set_residents_for_node(node_id, residents) + return True except Exception as exc: logger.warning("Agent %s unreachable: %s", node_id, exc) diff --git a/circuitforge_core/resources/coordinator/lease_manager.py b/circuitforge_core/resources/coordinator/lease_manager.py index 652b64b..80c7c65 100644 --- a/circuitforge_core/resources/coordinator/lease_manager.py +++ b/circuitforge_core/resources/coordinator/lease_manager.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio from collections import defaultdict -from circuitforge_core.resources.models import VRAMLease +from circuitforge_core.resources.models import ResidentAllocation, VRAMLease class LeaseManager: @@ -12,6 +12,9 @@ class LeaseManager: self._gpu_total: dict[tuple[str, int], int] = {} self._gpu_used: dict[tuple[str, int], int] = defaultdict(int) self._lock = asyncio.Lock() + # Resident allocations — keyed "node_id:service", updated by heartbeat. + # No lock needed: only the single heartbeat task writes this dict. + self._residents: dict[str, ResidentAllocation] = {} def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None: self._gpu_total[(node_id, gpu_id)] = total_mb @@ -86,3 +89,42 @@ class LeaseManager: def all_leases(self) -> list[VRAMLease]: return list(self._leases.values()) + + # ── resident tracking ──────────────────────────────────────────── + + def set_residents_for_node( + self, + node_id: str, + residents: list[tuple[str, str | None]], # (service, model_name) + ) -> None: + """ + Replace the resident snapshot for a node. + + Preserves first_seen for entries whose service+model_name are unchanged, + so the dashboard can show how long a model has been warm. + """ + new_keys = {f"{node_id}:{service}" for service, _ in residents} + + # Remove stale entries (service no longer running on this node). + for key in list(self._residents): + if key.startswith(f"{node_id}:") and key not in new_keys: + del self._residents[key] + + # Upsert: preserve first_seen when model is unchanged, reset otherwise. + for service, model_name in residents: + key = f"{node_id}:{service}" + existing = self._residents.get(key) + if existing is not None and existing.model_name == model_name: + continue # same model still loaded — keep original first_seen + self._residents[key] = ResidentAllocation( + service=service, + node_id=node_id, + model_name=model_name, + ) + + def all_residents(self) -> list[ResidentAllocation]: + return list(self._residents.values()) + + def resident_keys(self) -> set[str]: + """Return set of 'node_id:service' strings for currently-warm services.""" + return set(self._residents.keys()) diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py index e8f6bbe..48c40f6 100644 --- a/tests/test_resources/test_coordinator_app.py +++ b/tests/test_resources/test_coordinator_app.py @@ -2,6 +2,7 @@ import pytest from unittest.mock import MagicMock from fastapi.testclient import TestClient from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo @@ -112,3 +113,22 @@ def test_dashboard_serves_html(coordinator_client): assert "cf-orch" in resp.text assert "/api/nodes" in resp.text assert "/api/leases" in resp.text + + +def test_online_agents_excludes_offline(): + lm = LeaseManager() + sup = AgentSupervisor(lm) + sup.register("online_node", "http://a:7701") + sup.register("offline_node", "http://b:7701") + sup._agents["online_node"].online = True + sup._agents["offline_node"].online = False + result = sup.online_agents() + assert "online_node" in result + assert "offline_node" not in result + + +def test_resident_keys_returns_set_of_node_service(): + lm = LeaseManager() + lm.set_residents_for_node("heimdall", [("vllm", "Ouro-1.4B"), ("ollama", None)]) + keys = lm.resident_keys() + assert keys == {"heimdall:vllm", "heimdall:ollama"}