diff --git a/circuitforge_core/llm/router.py b/circuitforge_core/llm/router.py
index e404fbe..3a22d81 100644
--- a/circuitforge_core/llm/router.py
+++ b/circuitforge_core/llm/router.py
@@ -3,12 +3,15 @@ LLM abstraction layer with priority fallback chain.
Reads config from ~/.config/circuitforge/llm.yaml.
Tries backends in order; falls back on any error.
"""
+import logging
import os
import yaml
import requests
from pathlib import Path
from openai import OpenAI
+logger = logging.getLogger(__name__)
+
CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml"
@@ -38,6 +41,33 @@ class LLMRouter:
models = client.models.list()
return models.data[0].id
+ def _try_cf_orch_alloc(self, backend: dict) -> "tuple | None":
+ """
+ If backend config has a cf_orch block and CF_ORCH_URL is set (env takes
+ precedence over yaml url), allocate via cf-orch and return (ctx, alloc).
+ Returns None if not configured or allocation fails.
+ Caller MUST call ctx.__exit__(None, None, None) in a finally block.
+ """
+ import os
+ orch_cfg = backend.get("cf_orch")
+ if not orch_cfg:
+ return None
+ orch_url = os.environ.get("CF_ORCH_URL", orch_cfg.get("url", ""))
+ if not orch_url:
+ return None
+ try:
+ from circuitforge_core.resources.client import CFOrchClient
+ client = CFOrchClient(orch_url)
+ service = orch_cfg.get("service", "vllm")
+ candidates = orch_cfg.get("model_candidates", [])
+ ttl_s = float(orch_cfg.get("ttl_s", 3600.0))
+ ctx = client.allocate(service, model_candidates=candidates, ttl_s=ttl_s, caller="llm-router")
+ alloc = ctx.__enter__()
+ return (ctx, alloc)
+ except Exception as exc:
+ logger.warning("[LLMRouter] cf_orch allocation failed, using base_url directly: %s", exc)
+ return None
+
def complete(self, prompt: str, system: str | None = None,
model_override: str | None = None,
fallback_order: list[str] | None = None,
@@ -105,6 +135,12 @@ class LLMRouter:
if not self._is_reachable(backend["base_url"]):
print(f"[LLMRouter] {name}: unreachable, skipping")
continue
+ # --- cf_orch: optionally override base_url with coordinator-allocated URL ---
+ orch_ctx = orch_alloc = None
+ orch_result = self._try_cf_orch_alloc(backend)
+ if orch_result is not None:
+ orch_ctx, orch_alloc = orch_result
+ backend = {**backend, "base_url": orch_alloc.url + "/v1"}
try:
client = OpenAI(
base_url=backend["base_url"],
@@ -136,6 +172,12 @@ class LLMRouter:
except Exception as e:
print(f"[LLMRouter] {name}: error — {e}, trying next")
continue
+ finally:
+ if orch_ctx is not None:
+ try:
+ orch_ctx.__exit__(None, None, None)
+ except Exception:
+ pass
elif backend["type"] == "anthropic":
api_key = os.environ.get(backend["api_key_env"], "")
diff --git a/circuitforge_core/resources/__init__.py b/circuitforge_core/resources/__init__.py
index e69de29..8bf5235 100644
--- a/circuitforge_core/resources/__init__.py
+++ b/circuitforge_core/resources/__init__.py
@@ -0,0 +1 @@
+from circuitforge_core.resources.client import CFOrchClient, Allocation # noqa: F401
diff --git a/circuitforge_core/resources/cli.py b/circuitforge_core/resources/cli.py
index 9e65f08..dc6883f 100644
--- a/circuitforge_core/resources/cli.py
+++ b/circuitforge_core/resources/cli.py
@@ -44,11 +44,17 @@ def start(
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.app import create_coordinator_app
+ from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
lease_manager = LeaseManager()
profile_registry = ProfileRegistry()
- supervisor = AgentSupervisor(lease_manager=lease_manager)
+ service_registry = ServiceRegistry()
+ supervisor = AgentSupervisor(
+ lease_manager=lease_manager,
+ service_registry=service_registry,
+ profile_registry=profile_registry,
+ )
monitor = GpuMonitor()
gpus = monitor.poll()
@@ -79,6 +85,7 @@ def start(
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=supervisor,
+ service_registry=service_registry,
)
typer.echo(f"Starting cf-orch coordinator on {host}:{port}")
@@ -92,6 +99,7 @@ def agent(
host: str = "0.0.0.0",
port: int = 7701,
advertise_host: Optional[str] = None,
+ profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None,
) -> None:
"""Start a cf-orch node agent and self-register with the coordinator.
@@ -101,10 +109,11 @@ def agent(
Use --advertise-host to override the IP the coordinator should use to
reach this agent (e.g. on a multi-homed or NATted host).
"""
- import asyncio
import threading
import httpx
from circuitforge_core.resources.agent.app import create_agent_app
+ from circuitforge_core.resources.agent.service_manager import ServiceManager
+ from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
# The URL the coordinator should use to reach this agent.
reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host)
@@ -132,7 +141,18 @@ def agent(
# Fire registration in a daemon thread so uvicorn.run() can start blocking immediately.
threading.Thread(target=_register_in_background, daemon=True).start()
- agent_app = create_agent_app(node_id=node_id)
+ service_manager = None
+ try:
+ from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
+ pr = ProfileRegistry()
+ gpus = GpuMonitor().poll()
+ p = pr.load(Path(profile)) if profile else pr.auto_detect(gpus)
+ service_manager = ServiceManager(node_id=node_id, profile=p, advertise_host=reach_host)
+ typer.echo(f"ServiceManager ready with profile: {p.name}")
+ except Exception as exc:
+ typer.echo(f"Warning: ServiceManager unavailable ({exc})", err=True)
+
+ agent_app = create_agent_app(node_id=node_id, service_manager=service_manager)
typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}")
uvicorn.run(agent_app, host=host, port=port)
diff --git a/circuitforge_core/resources/client.py b/circuitforge_core/resources/client.py
new file mode 100644
index 0000000..fa0a72e
--- /dev/null
+++ b/circuitforge_core/resources/client.py
@@ -0,0 +1,126 @@
+from __future__ import annotations
+
+import logging
+from contextlib import contextmanager, asynccontextmanager
+from dataclasses import dataclass
+
+import httpx
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class Allocation:
+ allocation_id: str
+ service: str
+ node_id: str
+ gpu_id: int
+ model: str | None
+ url: str
+ started: bool
+ warm: bool
+
+
+class CFOrchClient:
+ """
+ Client for cf-orch coordinator allocation.
+
+ Sync usage (in LLMRouter or other sync code):
+ client = CFOrchClient(os.environ["CF_ORCH_URL"])
+ with client.allocate("vllm", model_candidates=["Ouro-1.4B"]) as alloc:
+ # alloc.url is the inference endpoint
+
+ Async usage (in FastAPI apps):
+ async with client.allocate_async("vllm", model_candidates=["Ouro-1.4B"]) as alloc:
+ ...
+
+ Raises ValueError immediately if coordinator_url is empty.
+ """
+
+ def __init__(self, coordinator_url: str) -> None:
+ if not coordinator_url:
+ raise ValueError("coordinator_url is empty — cf-orch not configured")
+ self._url = coordinator_url.rstrip("/")
+
+ def _build_body(self, model_candidates: list[str] | None, ttl_s: float, caller: str) -> dict:
+ return {
+ "model_candidates": model_candidates or [],
+ "ttl_s": ttl_s,
+ "caller": caller,
+ }
+
+ def _parse_allocation(self, data: dict, service: str) -> Allocation:
+ return Allocation(
+ allocation_id=data["allocation_id"],
+ service=service,
+ node_id=data["node_id"],
+ gpu_id=data["gpu_id"],
+ model=data.get("model"),
+ url=data["url"],
+ started=data.get("started", False),
+ warm=data.get("warm", False),
+ )
+
+ @contextmanager
+ def allocate(
+ self,
+ service: str,
+ *,
+ model_candidates: list[str] | None = None,
+ ttl_s: float = 3600.0,
+ caller: str = "",
+ ):
+ """Sync context manager. Allocates on enter, releases on exit."""
+ resp = httpx.post(
+ f"{self._url}/api/services/{service}/allocate",
+ json=self._build_body(model_candidates, ttl_s, caller),
+ timeout=120.0,
+ )
+ if not resp.is_success:
+ raise RuntimeError(
+ f"cf-orch allocation failed for {service!r}: "
+ f"HTTP {resp.status_code} — {resp.text[:200]}"
+ )
+ alloc = self._parse_allocation(resp.json(), service)
+ try:
+ yield alloc
+ finally:
+ try:
+ httpx.delete(
+ f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}",
+ timeout=10.0,
+ )
+ except Exception as exc:
+ logger.debug("cf-orch release failed (non-fatal): %s", exc)
+
+ @asynccontextmanager
+ async def allocate_async(
+ self,
+ service: str,
+ *,
+ model_candidates: list[str] | None = None,
+ ttl_s: float = 3600.0,
+ caller: str = "",
+ ):
+ """Async context manager. Allocates on enter, releases on exit."""
+ async with httpx.AsyncClient(timeout=120.0) as client:
+ resp = await client.post(
+ f"{self._url}/api/services/{service}/allocate",
+ json=self._build_body(model_candidates, ttl_s, caller),
+ )
+ if not resp.is_success:
+ raise RuntimeError(
+ f"cf-orch allocation failed for {service!r}: "
+ f"HTTP {resp.status_code} — {resp.text[:200]}"
+ )
+ alloc = self._parse_allocation(resp.json(), service)
+ try:
+ yield alloc
+ finally:
+ try:
+ await client.delete(
+ f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}",
+ timeout=10.0,
+ )
+ except Exception as exc:
+ logger.debug("cf-orch async release failed (non-fatal): %s", exc)
diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py
index 63d92e7..8536636 100644
--- a/circuitforge_core/resources/coordinator/agent_supervisor.py
+++ b/circuitforge_core/resources/coordinator/agent_supervisor.py
@@ -8,7 +8,9 @@ from dataclasses import dataclass, field
import httpx
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
-from circuitforge_core.resources.models import GpuInfo, NodeInfo
+from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
+from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
+from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation
logger = logging.getLogger(__name__)
@@ -26,15 +28,27 @@ class AgentRecord:
class AgentSupervisor:
- def __init__(self, lease_manager: LeaseManager) -> None:
+ def __init__(
+ self,
+ lease_manager: LeaseManager,
+ service_registry: ServiceRegistry | None = None,
+ profile_registry: ProfileRegistry | None = None,
+ ) -> None:
self._agents: dict[str, AgentRecord] = {}
self._lease_manager = lease_manager
self._running = False
+ self._service_registry = service_registry
+ self._profile_registry = profile_registry
+ self._heartbeat_tick = 0
def register(self, node_id: str, agent_url: str) -> None:
if node_id not in self._agents:
self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url)
logger.info("Registered agent node: %s @ %s", node_id, agent_url)
+ else:
+ if self._agents[node_id].agent_url != agent_url:
+ self._agents[node_id].agent_url = agent_url
+ logger.info("Updated agent URL for %s → %s", node_id, agent_url)
def get_node_info(self, node_id: str) -> NodeInfo | None:
record = self._agents.get(node_id)
@@ -58,15 +72,27 @@ class AgentSupervisor:
for r in self._agents.values()
]
+ def online_agents(self) -> "dict[str, AgentRecord]":
+ """Return only currently-online agents, keyed by node_id."""
+ return {nid: rec for nid, rec in self._agents.items() if rec.online}
+
async def poll_agent(self, node_id: str) -> bool:
record = self._agents.get(node_id)
if record is None:
return False
try:
async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client:
- resp = await client.get(f"{record.agent_url}/gpu-info")
- resp.raise_for_status()
- data = resp.json()
+ gpu_resp = await client.get(f"{record.agent_url}/gpu-info")
+ gpu_resp.raise_for_status()
+
+ # Resident-info is best-effort — older agents may not have the endpoint.
+ try:
+ res_resp = await client.get(f"{record.agent_url}/resident-info")
+ resident_data = res_resp.json() if res_resp.is_success else {}
+ except Exception:
+ resident_data = {}
+
+ data = gpu_resp.json()
gpus = [
GpuInfo(
gpu_id=g["gpu_id"],
@@ -82,6 +108,13 @@ class AgentSupervisor:
record.online = True
for gpu in gpus:
self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb)
+
+ residents = [
+ (r["service"], r.get("model_name"))
+ for r in resident_data.get("residents", [])
+ ]
+ self._lease_manager.set_residents_for_node(node_id, residents)
+
return True
except Exception as exc:
logger.warning("Agent %s unreachable: %s", node_id, exc)
@@ -91,10 +124,58 @@ class AgentSupervisor:
async def poll_all(self) -> None:
await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents])
+ def _build_idle_stop_config(self) -> dict[str, int]:
+ if self._profile_registry is None:
+ return {}
+ config: dict[str, int] = {}
+ for profile in self._profile_registry.list_public():
+ for svc_name, svc in profile.services.items():
+ if svc.idle_stop_after_s > 0:
+ existing = config.get(svc_name, 0)
+ config[svc_name] = min(existing, svc.idle_stop_after_s) if existing > 0 else svc.idle_stop_after_s
+ return config
+
+ async def _http_post(self, url: str) -> bool:
+ try:
+ async with httpx.AsyncClient(timeout=10.0) as client:
+ resp = await client.post(url)
+ return resp.is_success
+ except Exception as exc:
+ logger.warning("HTTP POST %s failed: %s", url, exc)
+ return False
+
+ async def _run_idle_sweep(self) -> None:
+ if self._service_registry is None:
+ return
+ expired = self._service_registry.sweep_expired_allocations()
+ if expired:
+ logger.info("TTL sweep: expired %d allocation(s): %s", len(expired), expired)
+ idle_stop_config = self._build_idle_stop_config()
+ if not idle_stop_config:
+ return
+ timed_out = self._service_registry.idle_past_timeout(idle_stop_config)
+ for instance in timed_out:
+ node_info = self.get_node_info(instance.node_id)
+ if node_info is None:
+ continue
+ stop_url = f"{node_info.agent_url}/services/{instance.service}/stop"
+ logger.info(
+ "Idle sweep: stopping %s on %s gpu%s (idle timeout)",
+ instance.service, instance.node_id, instance.gpu_id,
+ )
+ success = await self._http_post(stop_url)
+ if success:
+ self._service_registry.mark_stopped(
+ instance.service, instance.node_id, instance.gpu_id
+ )
+
async def run_heartbeat_loop(self) -> None:
self._running = True
while self._running:
await self.poll_all()
+ self._heartbeat_tick += 1
+ if self._heartbeat_tick % 3 == 0:
+ await self._run_idle_sweep()
await asyncio.sleep(_HEARTBEAT_INTERVAL_S)
def stop(self) -> None:
diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py
index abf333f..1fd7c91 100644
--- a/circuitforge_core/resources/coordinator/app.py
+++ b/circuitforge_core/resources/coordinator/app.py
@@ -11,7 +11,9 @@ from pydantic import BaseModel
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
+from circuitforge_core.resources.coordinator.node_selector import select_node
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
+from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
@@ -30,10 +32,29 @@ class NodeRegisterRequest(BaseModel):
agent_url: str # e.g. "http://10.1.10.71:7701"
+class ServiceEnsureRequest(BaseModel):
+ node_id: str
+ gpu_id: int = 0
+ params: dict[str, str] = {}
+ ttl_s: float = 3600.0
+ # Ordered list of model names to try; falls back down the list if VRAM is tight.
+ # The "model" key in params is used if this list is empty.
+ model_candidates: list[str] = []
+
+
+class ServiceAllocateRequest(BaseModel):
+ model_candidates: list[str] = []
+ gpu_id: int | None = None
+ params: dict[str, str] = {}
+ ttl_s: float = 3600.0
+ caller: str = ""
+
+
def create_coordinator_app(
lease_manager: LeaseManager,
profile_registry: ProfileRegistry,
agent_supervisor: AgentSupervisor,
+ service_registry: ServiceRegistry,
) -> FastAPI:
eviction_engine = EvictionEngine(lease_manager=lease_manager)
@@ -95,6 +116,20 @@ def create_coordinator_app(
]
}
+ @app.get("/api/resident")
+ def get_residents() -> dict[str, Any]:
+ return {
+ "residents": [
+ {
+ "service": r.service,
+ "node_id": r.node_id,
+ "model_name": r.model_name,
+ "first_seen": r.first_seen,
+ }
+ for r in lease_manager.all_residents()
+ ]
+ }
+
@app.get("/api/leases")
def get_leases() -> dict[str, Any]:
return {
@@ -155,4 +190,232 @@ def create_coordinator_app(
raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found")
return {"released": True, "lease_id": lease_id}
+ @app.post("/api/services/{service}/ensure")
+ async def ensure_service(service: str, req: ServiceEnsureRequest) -> dict[str, Any]:
+ """
+ Ensure a managed service is running on the given node.
+
+ If model_candidates is provided, tries each model in order, skipping any
+ that exceed the live free VRAM on the target GPU. Falls back down the list
+ until one succeeds. The selected model is returned in the response.
+ """
+ import httpx
+
+ node_info = agent_supervisor.get_node_info(req.node_id)
+ if node_info is None:
+ raise HTTPException(422, detail=f"Unknown node_id {req.node_id!r}")
+
+ # Resolve candidate list — fall back to params["model"] if not specified.
+ candidates: list[str] = req.model_candidates or (
+ [req.params["model"]] if "model" in req.params else []
+ )
+ if not candidates:
+ raise HTTPException(422, detail="No model specified: set params.model or model_candidates")
+
+ # Live free VRAM on the target GPU (used for pre-flight filtering).
+ gpu = next((g for g in node_info.gpus if g.gpu_id == req.gpu_id), None)
+ free_mb = gpu.vram_free_mb if gpu else 0
+
+ # Profile max_mb for the service gives us the VRAM ceiling for this slot.
+ # Models larger than free_mb are skipped before we even try to start them.
+ # We use model file size as a rough proxy — skip if free_mb < half of max_mb,
+ # since a fully-loaded model typically needs ~50-80% of its param size in VRAM.
+ service_max_mb = 0
+ for p in profile_registry.list_public():
+ svc = p.services.get(service)
+ if svc:
+ service_max_mb = svc.max_mb
+ break
+
+ # Filter candidates by VRAM headroom — skip models where free VRAM
+ # is less than half of the service's max_mb ceiling.
+ if service_max_mb > 0 and free_mb < service_max_mb // 2:
+ raise HTTPException(
+ 503,
+ detail=f"Insufficient VRAM on gpu {req.gpu_id}: {free_mb}MB free, need at least {service_max_mb // 2}MB",
+ )
+
+ last_error: str = ""
+ async with httpx.AsyncClient(timeout=120.0) as client:
+ for model in candidates:
+ params_with_model = {**req.params, "model": model}
+ try:
+ start_resp = await client.post(
+ f"{node_info.agent_url}/services/{service}/start",
+ json={"gpu_id": req.gpu_id, "params": params_with_model},
+ )
+ if start_resp.is_success:
+ data = start_resp.json()
+ return {
+ "service": service,
+ "node_id": req.node_id,
+ "gpu_id": req.gpu_id,
+ "model": model,
+ "url": data.get("url"),
+ "running": data.get("running", False),
+ }
+ last_error = start_resp.text
+ except httpx.HTTPError as exc:
+ raise HTTPException(502, detail=f"Agent unreachable: {exc}")
+
+ raise HTTPException(
+ 503,
+ detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}",
+ )
+
+ @app.post("/api/services/{service}/allocate")
+ async def allocate_service(service: str, req: ServiceAllocateRequest) -> dict[str, Any]:
+ """
+ Allocate a managed service — coordinator picks the best node automatically.
+ Returns a URL + allocation_id. (Allocation not tracked server-side until Phase 2.)
+ """
+ import httpx
+
+ if not req.model_candidates:
+ raise HTTPException(422, detail="model_candidates must be non-empty")
+
+ # Validate service is known in at least one profile, regardless of gpu_id
+ if not any(service in p.services for p in profile_registry.list_public()):
+ raise HTTPException(422, detail=f"Unknown service {service!r} — not in any profile")
+
+ residents = lease_manager.resident_keys()
+
+ if req.gpu_id is None:
+ online = agent_supervisor.online_agents()
+ placement = select_node(online, service, profile_registry, residents)
+ if placement is None:
+ raise HTTPException(
+ 503,
+ detail=f"No online node has capacity for service {service!r}",
+ )
+ node_id, gpu_id = placement
+ else:
+ online = agent_supervisor.online_agents()
+ node_id = next(
+ (nid for nid, rec in online.items()
+ if any(g.gpu_id == req.gpu_id for g in rec.gpus)),
+ None,
+ )
+ if node_id is None:
+ raise HTTPException(422, detail=f"No online node has gpu_id={req.gpu_id}")
+ gpu_id = req.gpu_id
+
+ node_info = agent_supervisor.get_node_info(node_id)
+ if node_info is None:
+ raise HTTPException(422, detail=f"Node {node_id!r} not found")
+
+ warm = f"{node_id}:{service}" in residents
+
+ async with httpx.AsyncClient(timeout=120.0) as client:
+ last_error = ""
+ for model in req.model_candidates:
+ try:
+ resp = await client.post(
+ f"{node_info.agent_url}/services/{service}/start",
+ json={"gpu_id": gpu_id, "params": {**req.params, "model": model}},
+ )
+ if resp.is_success:
+ data = resp.json()
+ alloc = service_registry.allocate(
+ service=service,
+ node_id=node_id,
+ gpu_id=gpu_id,
+ model=model,
+ caller=req.caller,
+ url=data.get("url", ""),
+ ttl_s=req.ttl_s,
+ )
+ return {
+ "allocation_id": alloc.allocation_id,
+ "service": service,
+ "node_id": node_id,
+ "gpu_id": gpu_id,
+ "model": model,
+ "url": data.get("url"),
+ "started": not warm,
+ "warm": warm,
+ }
+ last_error = resp.text
+ except httpx.HTTPError as exc:
+ raise HTTPException(502, detail=f"Agent unreachable: {exc}")
+
+ raise HTTPException(
+ 503,
+ detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}",
+ )
+
+ @app.delete("/api/services/{service}/allocations/{allocation_id}")
+ async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]:
+ existing = service_registry.get_allocation(allocation_id)
+ if existing is None or existing.service != service:
+ raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found for service {service!r}")
+ released = service_registry.release(allocation_id)
+ if not released:
+ raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found")
+ return {"released": True, "allocation_id": allocation_id}
+
+ @app.get("/api/services/{service}/status")
+ def get_service_status(service: str) -> dict[str, Any]:
+ instances = [i for i in service_registry.all_instances() if i.service == service]
+ allocations = [a for a in service_registry.all_allocations() if a.service == service]
+ return {
+ "service": service,
+ "instances": [
+ {
+ "node_id": i.node_id,
+ "gpu_id": i.gpu_id,
+ "state": i.state,
+ "model": i.model,
+ "url": i.url,
+ "idle_since": i.idle_since,
+ }
+ for i in instances
+ ],
+ "allocations": [
+ {
+ "allocation_id": a.allocation_id,
+ "node_id": a.node_id,
+ "gpu_id": a.gpu_id,
+ "model": a.model,
+ "caller": a.caller,
+ "url": a.url,
+ "expires_at": a.expires_at,
+ }
+ for a in allocations
+ ],
+ }
+
+ @app.get("/api/services")
+ def list_services() -> dict[str, Any]:
+ instances = service_registry.all_instances()
+ return {
+ "services": [
+ {
+ "service": i.service,
+ "node_id": i.node_id,
+ "gpu_id": i.gpu_id,
+ "state": i.state,
+ "model": i.model,
+ "url": i.url,
+ }
+ for i in instances
+ ]
+ }
+
+ @app.delete("/api/services/{service}")
+ async def stop_service(service: str, node_id: str) -> dict[str, Any]:
+ """Stop a managed service on the given node."""
+ node_info = agent_supervisor.get_node_info(node_id)
+ if node_info is None:
+ raise HTTPException(422, detail=f"Unknown node_id {node_id!r}")
+
+ import httpx
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ try:
+ resp = await client.post(f"{node_info.agent_url}/services/{service}/stop")
+ resp.raise_for_status()
+ return {"service": service, "node_id": node_id, "stopped": resp.json().get("stopped", False)}
+ except httpx.HTTPError as exc:
+ raise HTTPException(502, detail=f"Agent unreachable: {exc}")
+
return app
diff --git a/circuitforge_core/resources/coordinator/dashboard.html b/circuitforge_core/resources/coordinator/dashboard.html
index 79fc9cb..a657111 100644
--- a/circuitforge_core/resources/coordinator/dashboard.html
+++ b/circuitforge_core/resources/coordinator/dashboard.html
@@ -52,8 +52,9 @@
.gpu-node { font-size: 0.75em; font-weight: 700; color: var(--indigo); margin-bottom: 1px; }
.gpu-offline .gpu-node { color: var(--orange); }
.gpu-name { font-size: 0.78em; color: var(--text); margin-bottom: 0.4rem; }
- .vram-track { background: var(--bg); border-radius: var(--radius-sm); height: 6px; margin-bottom: 0.3rem; }
- .vram-fill { height: 100%; border-radius: var(--radius-sm); transition: width 0.4s; }
+ .vram-track { position: relative; background: var(--bg); border-radius: var(--radius-sm); height: 6px; margin-bottom: 0.3rem; overflow: hidden; }
+ .vram-leased { position: absolute; left: 0; top: 0; height: 100%; background: var(--cyan); transition: width 0.4s; }
+ .vram-resident { position: absolute; top: 0; height: 100%; background: var(--amber); transition: left 0.4s, width 0.4s; }
.vram-label { font-size: 0.72em; color: var(--muted); margin-bottom: 0.25rem; }
.gpu-status { font-size: 0.72em; }
.gpu-status.idle { color: var(--green); }
@@ -62,21 +63,30 @@
.gpu-status.offline { color: var(--orange); }
.spark-track { height: 24px; background: var(--bg); border-radius: var(--radius-sm); margin-top: 0.4rem; overflow: hidden; }
- /* leases */
- #leases-table { width: 100%; border-collapse: collapse; background: var(--bg2); border: 1px solid var(--border); border-radius: var(--radius); overflow: hidden; margin-bottom: 1rem; }
- #leases-table th { background: var(--bg3); color: var(--dim); font-size: 0.72em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; padding: 0.4rem 0.6rem; text-align: left; border-bottom: 1px solid var(--border); }
- #leases-table td { padding: 0.35rem 0.6rem; border-bottom: 1px solid var(--border-dim); font-size: 0.8em; vertical-align: middle; }
- #leases-table tr:last-child td { border-bottom: none; }
+ /* shared table base */
+ .cf-table { width: 100%; border-collapse: collapse; background: var(--bg2); border: 1px solid var(--border); border-radius: var(--radius); overflow: hidden; margin-bottom: 1rem; }
+ .cf-table th { background: var(--bg3); color: var(--dim); font-size: 0.72em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; padding: 0.4rem 0.6rem; text-align: left; border-bottom: 1px solid var(--border); }
+ .cf-table td { padding: 0.35rem 0.6rem; border-bottom: 1px solid var(--border-dim); font-size: 0.8em; vertical-align: middle; }
+ .cf-table tr:last-child td { border-bottom: none; }
.td-service { color: var(--indigo); font-weight: 600; }
.td-node { color: var(--muted); }
.td-mb { color: var(--text); }
.td-priority { color: var(--amber); }
+ .td-model { color: var(--cyan); font-size: 0.75em; }
+ .td-warm { color: var(--amber); }
.td-none { color: var(--dim); font-style: italic; }
.ttl-wrap { display: flex; align-items: center; gap: 0.5rem; }
.ttl-label { color: var(--cyan); font-variant-numeric: tabular-nums; white-space: nowrap; }
.ttl-track { flex: 1; background: var(--bg); border-radius: var(--radius-sm); height: 4px; }
.ttl-fill { height: 100%; border-radius: var(--radius-sm); background: var(--cyan); transition: width 0.4s; }
+ /* service state classes */
+ .state-running { color: #2ecc40; }
+ .state-idle { color: #ff851b; }
+ .state-stopped { color: #aaa; }
+ .state-starting { color: #0074d9; }
+ .state-unknown { color: #ff4136; }
+
/* error */
#error-banner { display: none; background: rgba(248,81,73,.1); border: 1px solid var(--red); border-radius: var(--radius); color: var(--red); padding: 0.5rem 0.75rem; font-size: 0.82em; margin-bottom: 1rem; }
@@ -102,8 +112,20 @@
GPU Nodes
+
+
Service Instances
+
+
+
+ | Service | Node | GPU | State | Model | URL |
+
+
+
+
+
+
Active Leases
-
+
| Service | Node / GPU | VRAM | Priority | TTL / Expires |
@@ -112,10 +134,22 @@
+Warm Models
+
+
+
+ | Service | Node | Model | Warm Since |
+
+
+
+
+
@@ -198,6 +232,41 @@ setInterval(() => {
document.getElementById('countdown').textContent = countdown;
}, 1000);
+// ── state class helper ───────────────────────────────────────────
+function stateClass(state) {
+ const map = { running: 'state-running', idle: 'state-idle', stopped: 'state-stopped', starting: 'state-starting' };
+ return map[state] || 'state-unknown';
+}
+
+// ── render: services table ───────────────────────────────────────
+function renderServices(services) {
+ const tbody = document.getElementById('services-body');
+ if (!services || services.length === 0) {
+ const tr = document.createElement('tr');
+ const td = el('td', { cls: 'td-none', text: 'No service instances registered.' });
+ td.setAttribute('colspan', '6');
+ tr.appendChild(td);
+ setChildren(tbody, tr);
+ return;
+ }
+
+ const rows = services.map(svc => {
+ const tr = document.createElement('tr');
+ const fields = [
+ { text: svc.service, cls: 'td-service' },
+ { text: svc.node_id, cls: 'td-node' },
+ { text: String(svc.gpu_id), cls: 'td-mb' },
+ { text: svc.state, cls: stateClass(svc.state) },
+ { text: svc.model || '\u2014', cls: 'td-model' },
+ { text: svc.url || '\u2014', cls: 'td-node' },
+ ];
+ fields.forEach(f => tr.appendChild(el('td', { cls: f.cls, text: f.text })));
+ return tr;
+ });
+
+ setChildren(tbody, ...rows);
+}
+
// ── render: health strip ─────────────────────────────────────────
function renderHealth(ok) {
const strip = document.getElementById('health-strip');
@@ -206,7 +275,8 @@ function renderHealth(ok) {
}
// ── render: GPU grid ─────────────────────────────────────────────
-function renderNodes(nodes) {
+// leasedByGpu: "nodeId:gpuId" → total MB currently leased (from active leases)
+function renderNodes(nodes, leasedByGpu) {
const grid = document.getElementById('gpu-grid');
if (!nodes || nodes.length === 0) {
setChildren(grid, el('div', { text: 'No nodes registered.', style: { color: 'var(--dim)', fontSize: '0.8em', padding: '0.5rem' } }));
@@ -216,33 +286,46 @@ function renderNodes(nodes) {
const cards = [];
for (const node of nodes) {
for (const gpu of node.gpus) {
- const key = node.node_id + ':' + gpu.gpu_id;
- const pct = gpu.vram_total_mb > 0 ? gpu.vram_used_mb / gpu.vram_total_mb : 0;
- const usedGb = (gpu.vram_used_mb / 1024).toFixed(1);
- const totalGb = (gpu.vram_total_mb / 1024).toFixed(1);
- const color = vramColor(pct);
+ const key = node.node_id + ':' + gpu.gpu_id;
+ const total = gpu.vram_total_mb || 1;
+ const used = gpu.vram_used_mb;
+ const leased = leasedByGpu[key] || 0;
+ // Resident = nvidia-smi used minus actively leased; clamped to [0, used].
+ const resident = Math.max(0, Math.min(used - leased, used));
+ const pct = used / total;
if (!sparkHistory[key]) sparkHistory[key] = [];
- sparkHistory[key].push(gpu.vram_used_mb);
+ sparkHistory[key].push(used);
if (sparkHistory[key].length > 20) sparkHistory[key].shift();
const statusCls = pct >= 0.9 ? 'full' : pct >= 0.1 ? 'busy' : 'idle';
const statusText = pct >= 0.9 ? 'saturated' : pct >= 0.1 ? Math.round(pct * 100) + '% used' : 'idle';
- const card = el('div', { cls: 'gpu-card' });
-
+ const card = el('div', { cls: 'gpu-card' });
const nodeLabel = el('div', { cls: 'gpu-node', text: node.node_id.toUpperCase() + ' · GPU ' + gpu.gpu_id });
const nameLine = el('div', { cls: 'gpu-name', text: gpu.name || 'Unknown GPU' });
- const track = el('div', { cls: 'vram-track' });
- const fill = el('div', { cls: 'vram-fill', style: { width: (pct * 100).toFixed(1) + '%', background: color } });
- track.appendChild(fill);
+ // Stacked bar: cyan (leased) → amber (resident) → dark bg (free).
+ const leasedPct = (leased / total * 100).toFixed(1);
+ const residentPct = (resident / total * 100).toFixed(1);
+ const track = el('div', { cls: 'vram-track' });
+ const fillLeased = el('div', { cls: 'vram-leased', style: { width: leasedPct + '%' } });
+ const fillResident = el('div', { cls: 'vram-resident', style: { left: leasedPct + '%', width: residentPct + '%' } });
+ append(track, fillLeased, fillResident);
- const vramLbl = el('div', { cls: 'vram-label', text: usedGb + ' / ' + totalGb + ' GB' });
- const statusEl = el('div', { cls: 'gpu-status ' + statusCls, text: statusText });
+ // Breakdown label when something is allocated.
+ let labelText = (used / 1024).toFixed(1) + ' / ' + (total / 1024).toFixed(1) + ' GB';
+ if (leased > 0 || resident > 0) {
+ const parts = [];
+ if (leased > 0) parts.push((leased / 1024).toFixed(1) + 'G leased');
+ if (resident > 0) parts.push((resident / 1024).toFixed(1) + 'G resident');
+ labelText += ' (' + parts.join(' · ') + ')';
+ }
+ const vramLbl = el('div', { cls: 'vram-label', text: labelText });
+ const statusEl = el('div', { cls: 'gpu-status ' + statusCls, text: statusText });
const sparkTrack = el('div', { cls: 'spark-track' });
- sparkTrack.appendChild(buildSparkline(sparkHistory[key], gpu.vram_total_mb));
+ sparkTrack.appendChild(buildSparkline(sparkHistory[key], total));
append(card, nodeLabel, nameLine, track, vramLbl, statusEl, sparkTrack);
cards.push(card);
@@ -252,6 +335,40 @@ function renderNodes(nodes) {
setChildren(grid, ...cards);
}
+// ── render: warm models table ────────────────────────────────────
+function renderResidents(residents) {
+ const tbody = document.getElementById('resident-body');
+ if (!residents || residents.length === 0) {
+ const tr = document.createElement('tr');
+ const td = el('td', { cls: 'td-none', text: 'No warm models detected.' });
+ td.setAttribute('colspan', '4');
+ tr.appendChild(td);
+ setChildren(tbody, tr);
+ return;
+ }
+
+ const now = Date.now() / 1000;
+ const rows = residents.map(r => {
+ const warmSecs = now - (r.first_seen || now);
+ const warmText = warmSecs < 60
+ ? Math.floor(warmSecs) + 's'
+ : warmSecs < 3600
+ ? Math.floor(warmSecs / 60) + 'm ' + String(Math.floor(warmSecs % 60)).padStart(2, '0') + 's'
+ : Math.floor(warmSecs / 3600) + 'h ' + String(Math.floor((warmSecs % 3600) / 60)).padStart(2, '0') + 'm';
+
+ const tr = document.createElement('tr');
+ append(tr,
+ el('td', { cls: 'td-service', text: r.service }),
+ el('td', { cls: 'td-node', text: r.node_id }),
+ el('td', { cls: 'td-model', text: r.model_name || '—' }),
+ el('td', { cls: 'td-warm', text: warmText }),
+ );
+ return tr;
+ });
+
+ setChildren(tbody, ...rows);
+}
+
// ── render: leases table ─────────────────────────────────────────
function renderLeases(leases) {
const tbody = document.getElementById('leases-body');
@@ -316,17 +433,33 @@ function clearError() { document.getElementById('error-banner').style.display =
// ── poll ─────────────────────────────────────────────────────────
async function poll() {
try {
- const [nodesRes, leasesRes, healthRes] = await Promise.all([
+ const [nodesRes, leasesRes, residentRes, healthRes, servicesRes] = await Promise.all([
fetch('/api/nodes'),
fetch('/api/leases'),
+ fetch('/api/resident'),
fetch('/api/health'),
+ fetch('/api/services'),
]);
if (!nodesRes.ok || !leasesRes.ok) throw new Error('API error: ' + nodesRes.status);
- const [nodesData, leasesData] = await Promise.all([nodesRes.json(), leasesRes.json()]);
+ const [nodesData, leasesData, residentData, servicesData] = await Promise.all([
+ nodesRes.json(), leasesRes.json(),
+ residentRes.ok ? residentRes.json() : Promise.resolve({ residents: [] }),
+ servicesRes.ok ? servicesRes.json() : Promise.resolve({ services: [] }),
+ ]);
+
+ // Build per-GPU leased-MB index for the stacked bar.
+ const leasedByGpu = {};
+ for (const lease of (leasesData.leases || [])) {
+ const key = lease.node_id + ':' + lease.gpu_id;
+ leasedByGpu[key] = (leasedByGpu[key] || 0) + lease.mb_granted;
+ }
+
clearError();
renderHealth(healthRes.ok);
- renderNodes(nodesData.nodes || []);
+ renderNodes(nodesData.nodes || [], leasedByGpu);
+ renderServices(servicesData.services || []);
renderLeases(leasesData.leases || []);
+ renderResidents(residentData.residents || []);
} catch (err) {
showError('Failed to reach coordinator: ' + err.message);
renderHealth(false);
diff --git a/circuitforge_core/resources/coordinator/lease_manager.py b/circuitforge_core/resources/coordinator/lease_manager.py
index 652b64b..80c7c65 100644
--- a/circuitforge_core/resources/coordinator/lease_manager.py
+++ b/circuitforge_core/resources/coordinator/lease_manager.py
@@ -3,7 +3,7 @@ from __future__ import annotations
import asyncio
from collections import defaultdict
-from circuitforge_core.resources.models import VRAMLease
+from circuitforge_core.resources.models import ResidentAllocation, VRAMLease
class LeaseManager:
@@ -12,6 +12,9 @@ class LeaseManager:
self._gpu_total: dict[tuple[str, int], int] = {}
self._gpu_used: dict[tuple[str, int], int] = defaultdict(int)
self._lock = asyncio.Lock()
+ # Resident allocations — keyed "node_id:service", updated by heartbeat.
+ # No lock needed: only the single heartbeat task writes this dict.
+ self._residents: dict[str, ResidentAllocation] = {}
def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None:
self._gpu_total[(node_id, gpu_id)] = total_mb
@@ -86,3 +89,42 @@ class LeaseManager:
def all_leases(self) -> list[VRAMLease]:
return list(self._leases.values())
+
+ # ── resident tracking ────────────────────────────────────────────
+
+ def set_residents_for_node(
+ self,
+ node_id: str,
+ residents: list[tuple[str, str | None]], # (service, model_name)
+ ) -> None:
+ """
+ Replace the resident snapshot for a node.
+
+ Preserves first_seen for entries whose service+model_name are unchanged,
+ so the dashboard can show how long a model has been warm.
+ """
+ new_keys = {f"{node_id}:{service}" for service, _ in residents}
+
+ # Remove stale entries (service no longer running on this node).
+ for key in list(self._residents):
+ if key.startswith(f"{node_id}:") and key not in new_keys:
+ del self._residents[key]
+
+ # Upsert: preserve first_seen when model is unchanged, reset otherwise.
+ for service, model_name in residents:
+ key = f"{node_id}:{service}"
+ existing = self._residents.get(key)
+ if existing is not None and existing.model_name == model_name:
+ continue # same model still loaded — keep original first_seen
+ self._residents[key] = ResidentAllocation(
+ service=service,
+ node_id=node_id,
+ model_name=model_name,
+ )
+
+ def all_residents(self) -> list[ResidentAllocation]:
+ return list(self._residents.values())
+
+ def resident_keys(self) -> set[str]:
+ """Return set of 'node_id:service' strings for currently-warm services."""
+ return set(self._residents.keys())
diff --git a/circuitforge_core/resources/coordinator/node_selector.py b/circuitforge_core/resources/coordinator/node_selector.py
new file mode 100644
index 0000000..9cdb9f4
--- /dev/null
+++ b/circuitforge_core/resources/coordinator/node_selector.py
@@ -0,0 +1,74 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord
+ from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
+
+_WARM_BONUS_MB = 1000
+
+
+@dataclass(frozen=True)
+class _Scored:
+ node_id: str
+ gpu_id: int
+ vram_free_mb: int
+ effective_free_mb: int
+ can_fit: bool
+ warm: bool
+
+
+def select_node(
+ agents: "dict[str, AgentRecord]",
+ service: str,
+ profile_registry: "ProfileRegistry",
+ resident_keys: set[str],
+) -> tuple[str, int] | None:
+ """
+ Pick the best (node_id, gpu_id) for the requested service.
+ Warm nodes (service already running) get priority, then sorted by free VRAM.
+ Returns None if no suitable node exists.
+ """
+ service_max_mb = _find_service_max_mb(service, profile_registry)
+ if service_max_mb is None:
+ return None # service not in any profile
+
+ candidates: list[_Scored] = []
+ for node_id, record in agents.items():
+ if not record.online:
+ continue
+ for gpu in record.gpus:
+ warm = f"{node_id}:{service}" in resident_keys
+ effective = gpu.vram_free_mb + (_WARM_BONUS_MB if warm else 0)
+ can_fit = gpu.vram_free_mb >= service_max_mb // 2
+ candidates.append(_Scored(
+ node_id=node_id,
+ gpu_id=gpu.gpu_id,
+ vram_free_mb=gpu.vram_free_mb,
+ effective_free_mb=effective,
+ can_fit=can_fit,
+ warm=warm,
+ ))
+ if not candidates:
+ return None
+ # Prefer: (1) warm nodes (model already resident — no cold start)
+ # (2) cold nodes that can fit the service (free >= half of max_mb)
+ # Fallback: best-effort node when nothing fits and nothing is warm
+ # (coordinator will attempt to start the service anyway; it may evict or fail)
+ # Note: resident_keys are per-node, not per-GPU. On multi-GPU nodes, the warm
+ # bonus applies to all GPUs on the node. This is a known coarseness —
+ # per-GPU resident tracking requires a resident_key format change.
+ preferred = [c for c in candidates if c.warm or c.can_fit]
+ pool = preferred if preferred else candidates
+ best = max(pool, key=lambda c: (c.warm, c.effective_free_mb))
+ return best.node_id, best.gpu_id
+
+
+def _find_service_max_mb(service: str, profile_registry: "ProfileRegistry") -> int | None:
+ for profile in profile_registry.list_public():
+ svc = profile.services.get(service)
+ if svc is not None:
+ return svc.max_mb
+ return None
diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py
new file mode 100644
index 0000000..c258ad7
--- /dev/null
+++ b/circuitforge_core/resources/coordinator/service_registry.py
@@ -0,0 +1,170 @@
+from __future__ import annotations
+
+import dataclasses
+import time
+import uuid
+from dataclasses import dataclass
+from typing import Literal
+
+
+@dataclass
+class ServiceAllocation:
+ allocation_id: str
+ service: str
+ node_id: str
+ gpu_id: int
+ model: str | None
+ caller: str
+ url: str
+ created_at: float
+ expires_at: float # 0 = no expiry
+
+
+@dataclass
+class ServiceInstance:
+ service: str
+ node_id: str
+ gpu_id: int
+ state: Literal["starting", "running", "idle", "stopped"]
+ model: str | None
+ url: str | None
+ idle_since: float | None = None
+
+
+class ServiceRegistry:
+ """
+ In-memory registry of service allocations and instance state.
+
+ Allocations: per-caller request — many per service instance.
+ Instances: per (service, node_id, gpu_id) — one per running container.
+ """
+
+ def __init__(self) -> None:
+ self._allocations: dict[str, ServiceAllocation] = {}
+ self._instances: dict[str, ServiceInstance] = {} # key: "service:node_id:gpu_id"
+
+ # ── allocation API ────────────────────────────────────────────────
+
+ def allocate(
+ self,
+ service: str,
+ node_id: str,
+ gpu_id: int,
+ model: str | None,
+ url: str,
+ caller: str,
+ ttl_s: float,
+ ) -> ServiceAllocation:
+ alloc = ServiceAllocation(
+ allocation_id=str(uuid.uuid4()),
+ service=service,
+ node_id=node_id,
+ gpu_id=gpu_id,
+ model=model,
+ caller=caller,
+ url=url,
+ created_at=time.time(),
+ expires_at=time.time() + ttl_s if ttl_s > 0 else 0.0,
+ )
+ self._allocations[alloc.allocation_id] = alloc
+
+ # If an instance exists in idle/stopped state, mark it running again
+ key = f"{service}:{node_id}:{gpu_id}"
+ if key in self._instances:
+ inst = self._instances[key]
+ if inst.state in ("idle", "stopped"):
+ self._instances[key] = dataclasses.replace(
+ inst, state="running", idle_since=None
+ )
+ return alloc
+
+ def release(self, allocation_id: str) -> bool:
+ alloc = self._allocations.pop(allocation_id, None)
+ if alloc is None:
+ return False
+ # If no active allocations remain for this instance, mark it idle
+ key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}"
+ if self.active_allocations(alloc.service, alloc.node_id, alloc.gpu_id) == 0:
+ if key in self._instances:
+ self._instances[key] = dataclasses.replace(
+ self._instances[key], state="idle", idle_since=time.time()
+ )
+ return True
+
+ def active_allocations(self, service: str, node_id: str, gpu_id: int) -> int:
+ return sum(
+ 1 for a in self._allocations.values()
+ if a.service == service and a.node_id == node_id and a.gpu_id == gpu_id
+ )
+
+ # ── instance API ─────────────────────────────────────────────────
+
+ def upsert_instance(
+ self,
+ service: str,
+ node_id: str,
+ gpu_id: int,
+ state: Literal["starting", "running", "idle", "stopped"],
+ model: str | None,
+ url: str | None,
+ ) -> ServiceInstance:
+ key = f"{service}:{node_id}:{gpu_id}"
+ existing = self._instances.get(key)
+ idle_since: float | None = None
+ if state == "idle":
+ # Preserve idle_since if already idle; set now if transitioning into idle
+ idle_since = existing.idle_since if (existing and existing.state == "idle") else time.time()
+ inst = ServiceInstance(
+ service=service, node_id=node_id, gpu_id=gpu_id,
+ state=state, model=model, url=url, idle_since=idle_since,
+ )
+ self._instances[key] = inst
+ return inst
+
+ def get_allocation(self, allocation_id: str) -> ServiceAllocation | None:
+ return self._allocations.get(allocation_id)
+
+ def sweep_expired_allocations(self) -> list[str]:
+ """
+ Remove all allocations whose TTL has elapsed and transition the
+ corresponding instance to 'idle' if no active allocations remain.
+ Returns the list of expired allocation_ids.
+ """
+ now = time.time()
+ expired = [
+ alloc_id
+ for alloc_id, alloc in self._allocations.items()
+ if alloc.expires_at > 0 and now > alloc.expires_at
+ ]
+ for alloc_id in expired:
+ self.release(alloc_id)
+ return expired
+
+ def all_allocations(self) -> list[ServiceAllocation]:
+ return list(self._allocations.values())
+
+ def all_instances(self) -> list[ServiceInstance]:
+ return list(self._instances.values())
+
+ def mark_stopped(self, service: str, node_id: str, gpu_id: int) -> None:
+ """Transition an instance to 'stopped' state and clear idle_since."""
+ key = f"{service}:{node_id}:{gpu_id}"
+ if key in self._instances:
+ self._instances[key] = dataclasses.replace(
+ self._instances[key], state="stopped", idle_since=None
+ )
+
+ def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]:
+ """
+ Return instances in 'idle' state whose idle time exceeds their configured timeout.
+ idle_stop_config: {service_name: seconds} — 0 means never stop automatically.
+ """
+ now = time.time()
+ result = []
+ for inst in self._instances.values():
+ if inst.state != "idle" or inst.idle_since is None:
+ continue
+ timeout = idle_stop_config.get(inst.service, 0)
+ if timeout > 0 and (now - inst.idle_since) >= timeout:
+ result.append(inst)
+ return result
diff --git a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml
index 7ad59f9..84daf6a 100644
--- a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml
+++ b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml
@@ -6,6 +6,7 @@ services:
vllm:
max_mb: 12288
priority: 1
+ idle_stop_after_s: 600
ollama:
max_mb: 12288
priority: 1
@@ -14,6 +15,11 @@ services:
priority: 2
shared: true
max_concurrent: 4
+ cf-docuvision:
+ max_mb: 6144
+ priority: 2
+ shared: true
+ max_concurrent: 3
cf-stt:
max_mb: 1200
priority: 2
diff --git a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml
index 4f98eb8..e0ca256 100644
--- a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml
+++ b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml
@@ -6,6 +6,7 @@ services:
vllm:
max_mb: 20480
priority: 1
+ idle_stop_after_s: 600
ollama:
max_mb: 18432
priority: 1
@@ -14,6 +15,11 @@ services:
priority: 2
shared: true
max_concurrent: 6
+ cf-docuvision:
+ max_mb: 8192
+ priority: 2
+ shared: true
+ max_concurrent: 4
cf-stt:
max_mb: 1200
priority: 2
diff --git a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml
index 92168ef..1888aa4 100644
--- a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml
+++ b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml
@@ -6,6 +6,17 @@ services:
vllm:
max_mb: 4096
priority: 1
+ idle_stop_after_s: 600
+ managed:
+ type: docker
+ image: "vllm/vllm-openai:v0.9.2"
+ port: 8000
+ host_port: 8000
+ command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8"
+ volumes:
+ - "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models"
+ runtime: nvidia
+ ipc: host
ollama:
max_mb: 3584
priority: 1
@@ -14,6 +25,11 @@ services:
priority: 2
shared: true
max_concurrent: 2
+ cf-docuvision:
+ max_mb: 3072
+ priority: 2
+ shared: true
+ max_concurrent: 1
cf-stt:
max_mb: 600
priority: 2
diff --git a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml
index 7053419..614416d 100644
--- a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml
+++ b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml
@@ -6,6 +6,17 @@ services:
vllm:
max_mb: 5120
priority: 1
+ idle_stop_after_s: 600
+ managed:
+ type: docker
+ image: "vllm/vllm-openai:v0.9.2"
+ port: 8000
+ host_port: 8000
+ command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8"
+ volumes:
+ - "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models"
+ runtime: nvidia
+ ipc: host
ollama:
max_mb: 4096
priority: 1
@@ -14,6 +25,11 @@ services:
priority: 2
shared: true
max_concurrent: 3
+ cf-docuvision:
+ max_mb: 4096
+ priority: 2
+ shared: true
+ max_concurrent: 2
cf-stt:
max_mb: 1200
priority: 2
@@ -28,6 +44,13 @@ services:
comfyui:
max_mb: 6144
priority: 4
+ managed:
+ type: process
+ exec_path: "/opt/miniconda3/envs/comfyui/bin/python"
+ args_template: "/opt/ComfyUI/main.py --listen 0.0.0.0 --port {port} --cuda-device {gpu_id}"
+ cwd: "/opt/ComfyUI"
+ port: 8188
+ host_port: 8188
model_size_hints:
llm_max_params: 8b
image_gen_max: sdxl-fp8
diff --git a/circuitforge_core/resources/profiles/schema.py b/circuitforge_core/resources/profiles/schema.py
index ac59020..1ba8ee5 100644
--- a/circuitforge_core/resources/profiles/schema.py
+++ b/circuitforge_core/resources/profiles/schema.py
@@ -5,22 +5,72 @@ from pathlib import Path
from typing import Any
import yaml
-from pydantic import BaseModel, Field
+from pydantic import BaseModel, Field, model_validator
SUPPORTED_SCHEMA_VERSION = 1
+class DockerSpec(BaseModel):
+ """Spec for a Docker-managed service."""
+
+ image: str
+ port: int
+ host_port: int
+ command_template: str = ""
+ volumes: list[str] = Field(default_factory=list)
+ env: dict[str, str] = Field(default_factory=dict)
+ runtime: str = "nvidia"
+ ipc: str = "host"
+
+ model_config = {"frozen": True}
+
+
+class ProcessSpec(BaseModel):
+ """Spec for a process-managed service (non-Docker, e.g. conda env)."""
+
+ exec_path: str
+ args_template: str = ""
+ cwd: str = ""
+ env: dict[str, str] = Field(default_factory=dict)
+ port: int = 0
+ host_port: int = 0
+
+ model_config = {"frozen": True}
+
+
class ServiceProfile(BaseModel):
max_mb: int
priority: int
shared: bool = False
max_concurrent: int = 1
always_on: bool = False
+ idle_stop_after_s: int = 0
backend: str | None = None
consumers: list[str] = Field(default_factory=list)
+ managed: DockerSpec | ProcessSpec | None = None
model_config = {"frozen": True}
+ @model_validator(mode="before")
+ @classmethod
+ def _parse_managed(cls, values: Any) -> Any:
+ if not isinstance(values, dict):
+ return values
+ raw = values.get("managed")
+ if raw is None:
+ return values
+ if not isinstance(raw, dict):
+ return values
+ spec_type = raw.get("type")
+ managed_fields = {k: v for k, v in raw.items() if k != "type"}
+ if spec_type == "docker":
+ values["managed"] = DockerSpec(**managed_fields)
+ elif spec_type == "process":
+ values["managed"] = ProcessSpec(**managed_fields)
+ else:
+ raise ValueError(f"Unknown managed service type: {spec_type!r}")
+ return values
+
class GpuNodeEntry(BaseModel):
id: int
diff --git a/tests/test_resources/test_agent_supervisor.py b/tests/test_resources/test_agent_supervisor.py
new file mode 100644
index 0000000..f669b62
--- /dev/null
+++ b/tests/test_resources/test_agent_supervisor.py
@@ -0,0 +1,93 @@
+import asyncio
+import time
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
+from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
+from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry, ServiceInstance
+
+
+def test_build_idle_stop_config_empty_without_registry():
+ lm = LeaseManager()
+ supervisor = AgentSupervisor(lease_manager=lm)
+ assert supervisor._build_idle_stop_config() == {}
+
+
+def test_build_idle_stop_config_from_profiles():
+ lm = LeaseManager()
+ mock_svc = MagicMock()
+ mock_svc.idle_stop_after_s = 600
+ mock_profile = MagicMock()
+ mock_profile.services = {"vllm": mock_svc}
+ mock_profile_registry = MagicMock()
+ mock_profile_registry.list_public.return_value = [mock_profile]
+
+ supervisor = AgentSupervisor(lease_manager=lm, profile_registry=mock_profile_registry)
+ config = supervisor._build_idle_stop_config()
+ assert config == {"vllm": 600}
+
+
+@pytest.mark.asyncio
+async def test_run_idle_sweep_posts_stop():
+ lm = LeaseManager()
+ service_registry = ServiceRegistry()
+
+ # Upsert instance as running, then allocate + release to transition it to idle
+ service_registry.upsert_instance(
+ service="vllm",
+ node_id="heimdall",
+ gpu_id=0,
+ state="running",
+ model="test-model",
+ url="http://heimdall:8000",
+ )
+ alloc = service_registry.allocate(
+ service="vllm",
+ node_id="heimdall",
+ gpu_id=0,
+ model="test-model",
+ url="http://heimdall:8000",
+ caller="test",
+ ttl_s=300.0,
+ )
+ service_registry.release(alloc.allocation_id)
+
+ # Backdate idle_since so it exceeds the timeout
+ import dataclasses
+ key = "vllm:heimdall:0"
+ inst = service_registry._instances[key]
+ service_registry._instances[key] = dataclasses.replace(inst, idle_since=time.time() - 700)
+
+ mock_profile_registry = MagicMock()
+ mock_svc = MagicMock()
+ mock_svc.idle_stop_after_s = 600
+ mock_profile = MagicMock()
+ mock_profile.services = {"vllm": mock_svc}
+ mock_profile_registry.list_public.return_value = [mock_profile]
+
+ supervisor = AgentSupervisor(
+ lease_manager=lm,
+ service_registry=service_registry,
+ profile_registry=mock_profile_registry,
+ )
+ supervisor.register("heimdall", "http://heimdall:7701")
+
+ posted_urls = []
+
+ async def fake_http_post(url: str) -> bool:
+ posted_urls.append(url)
+ return True
+
+ supervisor._http_post = fake_http_post
+ await supervisor._run_idle_sweep()
+
+ assert len(posted_urls) == 1
+ assert posted_urls[0] == "http://heimdall:7701/services/vllm/stop"
+
+
+@pytest.mark.asyncio
+async def test_run_idle_sweep_skips_without_registry():
+ lm = LeaseManager()
+ supervisor = AgentSupervisor(lease_manager=lm)
+ # Should return immediately without error
+ await supervisor._run_idle_sweep()
diff --git a/tests/test_resources/test_client.py b/tests/test_resources/test_client.py
new file mode 100644
index 0000000..288fb64
--- /dev/null
+++ b/tests/test_resources/test_client.py
@@ -0,0 +1,94 @@
+import json
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+import httpretty
+from circuitforge_core.resources.client import CFOrchClient, Allocation
+
+_ALLOC_BODY = (
+ '{"allocation_id":"abc123","service":"vllm","node_id":"heimdall",'
+ '"gpu_id":0,"model":"Ouro-1.4B","url":"http://heimdall:8000","started":false,"warm":true}'
+)
+
+
+@httpretty.activate
+def test_sync_allocate_returns_allocation():
+ httpretty.register_uri(
+ httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
+ body=_ALLOC_BODY, content_type="application/json",
+ )
+ httpretty.register_uri(
+ httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/abc123",
+ body='{"released":true}', content_type="application/json",
+ )
+ client = CFOrchClient("http://orch:7700")
+ with client.allocate("vllm", model_candidates=["Ouro-1.4B"], caller="test") as alloc:
+ assert isinstance(alloc, Allocation)
+ assert alloc.url == "http://heimdall:8000"
+ assert alloc.model == "Ouro-1.4B"
+ assert alloc.allocation_id == "abc123"
+ assert httpretty.last_request().method == "DELETE"
+
+
+@httpretty.activate
+def test_sync_allocate_ignores_404_on_release():
+ httpretty.register_uri(
+ httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
+ body='{"allocation_id":"xyz","service":"vllm","node_id":"a","gpu_id":0,'
+ '"model":"m","url":"http://a:8000","started":false,"warm":false}',
+ content_type="application/json",
+ )
+ httpretty.register_uri(
+ httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/xyz",
+ status=404, body='{"detail":"not found"}', content_type="application/json",
+ )
+ client = CFOrchClient("http://orch:7700")
+ with client.allocate("vllm", model_candidates=["m"]) as alloc:
+ assert alloc.url == "http://a:8000"
+ # No exception raised — 404 on release is silently ignored
+
+
+@httpretty.activate
+def test_sync_allocate_raises_on_503():
+ httpretty.register_uri(
+ httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
+ status=503, body='{"detail":"no capacity"}', content_type="application/json",
+ )
+ client = CFOrchClient("http://orch:7700")
+ with pytest.raises(RuntimeError, match="cf-orch allocation failed"):
+ with client.allocate("vllm", model_candidates=["m"]):
+ pass
+
+
+async def test_async_allocate_works():
+ # httpretty only patches stdlib sockets; httpx async uses anyio sockets so
+ # we mock httpx.AsyncClient directly instead.
+ alloc_data = {
+ "allocation_id": "a1", "service": "vllm", "node_id": "n",
+ "gpu_id": 0, "model": "m", "url": "http://n:8000",
+ "started": False, "warm": False,
+ }
+ release_data = {"released": True}
+
+ def _make_response(data, status_code=200):
+ resp = MagicMock()
+ resp.is_success = status_code < 400
+ resp.status_code = status_code
+ resp.json.return_value = data
+ return resp
+
+ mock_post = AsyncMock(return_value=_make_response(alloc_data))
+ mock_delete = AsyncMock(return_value=_make_response(release_data))
+
+ mock_async_client = MagicMock()
+ mock_async_client.post = mock_post
+ mock_async_client.delete = mock_delete
+ mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
+ mock_async_client.__aexit__ = AsyncMock(return_value=False)
+
+ with patch("httpx.AsyncClient", return_value=mock_async_client):
+ client = CFOrchClient("http://orch:7700")
+ async with client.allocate_async("vllm", model_candidates=["m"]) as alloc:
+ assert alloc.url == "http://n:8000"
+ assert alloc.allocation_id == "a1"
+ mock_delete.assert_called_once()
diff --git a/tests/test_resources/test_coordinator_allocate.py b/tests/test_resources/test_coordinator_allocate.py
new file mode 100644
index 0000000..eb9e078
--- /dev/null
+++ b/tests/test_resources/test_coordinator_allocate.py
@@ -0,0 +1,132 @@
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+from fastapi.testclient import TestClient
+from circuitforge_core.resources.coordinator.app import create_coordinator_app
+from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
+from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
+from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
+from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord
+from circuitforge_core.resources.models import GpuInfo, NodeInfo
+
+
+def _make_supervisor_mock(online: bool = True):
+ sup = MagicMock()
+ record = AgentRecord(node_id="heimdall", agent_url="http://heimdall:7701")
+ record.gpus = [GpuInfo(0, "RTX 4000", 8192, 0, 8192)]
+ record.online = online
+ sup.online_agents.return_value = {"heimdall": record} if online else {}
+ sup.get_node_info.return_value = NodeInfo(
+ node_id="heimdall",
+ agent_url="http://heimdall:7701",
+ gpus=record.gpus,
+ last_heartbeat=0.0,
+ )
+ return sup
+
+
+@pytest.fixture
+def alloc_client():
+ lm = LeaseManager()
+ pr = ProfileRegistry()
+ sup = _make_supervisor_mock()
+ sr = ServiceRegistry()
+ app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup, service_registry=sr)
+ return TestClient(app), sup, sr
+
+
+def test_allocate_returns_allocation_id_and_url(alloc_client):
+ client, sup, sr = alloc_client
+ with patch("httpx.AsyncClient") as mock_http:
+ mock_resp = MagicMock()
+ mock_resp.is_success = True
+ mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
+ mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
+
+ resp = client.post("/api/services/vllm/allocate", json={
+ "model_candidates": ["Ouro-1.4B"],
+ "ttl_s": 300.0,
+ "caller": "test",
+ })
+
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "allocation_id" in data
+ assert data["service"] == "vllm"
+ assert data["node_id"] == "heimdall"
+ assert data["url"] == "http://heimdall:8000"
+
+
+def test_allocate_returns_503_when_no_online_nodes(alloc_client):
+ client, sup, sr = alloc_client
+ sup.online_agents.return_value = {}
+ resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]})
+ assert resp.status_code == 503
+
+
+def test_allocate_returns_422_for_empty_candidates(alloc_client):
+ client, _, sr = alloc_client
+ resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []})
+ assert resp.status_code == 422
+
+
+def test_allocate_returns_422_for_unknown_service(alloc_client):
+ client, _, sr = alloc_client
+ resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]})
+ assert resp.status_code == 422
+
+
+def test_allocate_records_in_registry(alloc_client):
+ client, sup, sr = alloc_client
+ with patch("httpx.AsyncClient") as mock_http:
+ mock_resp = MagicMock()
+ mock_resp.is_success = True
+ mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
+ mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
+
+ resp = client.post("/api/services/vllm/allocate", json={
+ "model_candidates": ["Ouro-1.4B"],
+ "ttl_s": 300.0,
+ "caller": "test",
+ })
+
+ assert resp.status_code == 200
+ allocation_id = resp.json()["allocation_id"]
+
+ status_resp = client.get("/api/services/vllm/status")
+ assert status_resp.status_code == 200
+ status_data = status_resp.json()
+ assert status_data["service"] == "vllm"
+ alloc_ids = [a["allocation_id"] for a in status_data["allocations"]]
+ assert allocation_id in alloc_ids
+
+
+def test_release_allocation(alloc_client):
+ client, sup, sr = alloc_client
+ with patch("httpx.AsyncClient") as mock_http:
+ mock_resp = MagicMock()
+ mock_resp.is_success = True
+ mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
+ mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
+
+ resp = client.post("/api/services/vllm/allocate", json={
+ "model_candidates": ["Ouro-1.4B"],
+ "ttl_s": 300.0,
+ "caller": "test",
+ })
+
+ assert resp.status_code == 200
+ allocation_id = resp.json()["allocation_id"]
+
+ del_resp = client.delete(f"/api/services/vllm/allocations/{allocation_id}")
+ assert del_resp.status_code == 200
+ assert del_resp.json() == {"released": True, "allocation_id": allocation_id}
+
+ status_resp = client.get("/api/services/vllm/status")
+ alloc_ids = [a["allocation_id"] for a in status_resp.json()["allocations"]]
+ assert allocation_id not in alloc_ids
+
+
+def test_release_allocation_not_found(alloc_client):
+ client, _, sr = alloc_client
+ resp = client.delete("/api/services/vllm/allocations/bad-id")
+ assert resp.status_code == 404
diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py
index e8f6bbe..49eacbf 100644
--- a/tests/test_resources/test_coordinator_app.py
+++ b/tests/test_resources/test_coordinator_app.py
@@ -1,10 +1,14 @@
import pytest
from unittest.mock import MagicMock
+from pathlib import Path
from fastapi.testclient import TestClient
from circuitforge_core.resources.coordinator.app import create_coordinator_app
+from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
+from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo
+from circuitforge_core.resources.profiles.schema import load_profile
@pytest.fixture
@@ -32,6 +36,7 @@ def coordinator_client():
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=supervisor,
+ service_registry=ServiceRegistry(),
)
return TestClient(app), lease_manager
@@ -112,3 +117,67 @@ def test_dashboard_serves_html(coordinator_client):
assert "cf-orch" in resp.text
assert "/api/nodes" in resp.text
assert "/api/leases" in resp.text
+
+
+def test_online_agents_excludes_offline():
+ lm = LeaseManager()
+ sup = AgentSupervisor(lm)
+ sup.register("online_node", "http://a:7701")
+ sup.register("offline_node", "http://b:7701")
+ sup._agents["online_node"].online = True
+ sup._agents["offline_node"].online = False
+ result = sup.online_agents()
+ assert "online_node" in result
+ assert "offline_node" not in result
+
+
+def test_resident_keys_returns_set_of_node_service():
+ lm = LeaseManager()
+ lm.set_residents_for_node("heimdall", [("vllm", "Ouro-1.4B"), ("ollama", None)])
+ keys = lm.resident_keys()
+ assert keys == {"heimdall:vllm", "heimdall:ollama"}
+
+
+def test_single_gpu_8gb_profile_has_idle_stop_after_s():
+ profile = load_profile(
+ Path("circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml")
+ )
+ vllm_svc = profile.services.get("vllm")
+ assert vllm_svc is not None
+ assert hasattr(vllm_svc, "idle_stop_after_s")
+ assert vllm_svc.idle_stop_after_s == 600
+
+
+def test_ensure_service_returns_503_when_vram_too_low():
+ """VRAM pre-flight guard fires before any HTTP request when free VRAM < max_mb // 2."""
+ # vllm max_mb = 5120 → threshold = 2560 MB; 100 MB free triggers 503.
+ lease_manager = LeaseManager()
+ lease_manager.register_gpu("low-vram-node", 0, 512)
+ profile_registry = ProfileRegistry()
+ supervisor = MagicMock()
+ supervisor.get_node_info.return_value = NodeInfo(
+ node_id="low-vram-node",
+ agent_url="http://localhost:7701",
+ gpus=[GpuInfo(gpu_id=0, name="GTX 1050",
+ vram_total_mb=512, vram_used_mb=412, vram_free_mb=100)],
+ last_heartbeat=0.0,
+ )
+ supervisor.all_nodes.return_value = []
+ app = create_coordinator_app(
+ lease_manager=lease_manager,
+ profile_registry=profile_registry,
+ agent_supervisor=supervisor,
+ service_registry=ServiceRegistry(),
+ )
+ client = TestClient(app)
+
+ resp = client.post("/api/services/vllm/ensure", json={
+ "node_id": "low-vram-node",
+ "gpu_id": 0,
+ "params": {"model": "some-model"},
+ })
+
+ assert resp.status_code == 503
+ assert "Insufficient VRAM" in resp.json()["detail"]
+ # Guard must fire before any agent HTTP call is attempted.
+ supervisor.get_node_info.assert_called_once_with("low-vram-node")
diff --git a/tests/test_resources/test_integration.py b/tests/test_resources/test_integration.py
index 814bb32..8fa94ad 100644
--- a/tests/test_resources/test_integration.py
+++ b/tests/test_resources/test_integration.py
@@ -11,6 +11,7 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.app import create_coordinator_app
+from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo
@@ -47,6 +48,7 @@ def system():
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=mock_supervisor,
+ service_registry=ServiceRegistry(),
)
client = TestClient(app)
return client, lease_manager
diff --git a/tests/test_resources/test_node_selector.py b/tests/test_resources/test_node_selector.py
new file mode 100644
index 0000000..9e18a3a
--- /dev/null
+++ b/tests/test_resources/test_node_selector.py
@@ -0,0 +1,56 @@
+import pytest
+from circuitforge_core.resources.coordinator.node_selector import select_node
+from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord
+from circuitforge_core.resources.models import GpuInfo
+from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
+
+
+def _make_agent(node_id: str, free_mb: int, online: bool = True) -> AgentRecord:
+ r = AgentRecord(node_id=node_id, agent_url=f"http://{node_id}:7701")
+ r.gpus = [GpuInfo(gpu_id=0, name="RTX", vram_total_mb=8192,
+ vram_used_mb=8192 - free_mb, vram_free_mb=free_mb)]
+ r.online = online
+ return r
+
+
+def test_selects_node_with_most_free_vram():
+ agents = {
+ "a": _make_agent("a", free_mb=2000),
+ "b": _make_agent("b", free_mb=6000),
+ }
+ registry = ProfileRegistry()
+ result = select_node(agents, "vllm", registry, resident_keys=set())
+ assert result == ("b", 0)
+
+
+def test_prefers_warm_node_even_with_less_free_vram():
+ agents = {
+ "a": _make_agent("a", free_mb=2000),
+ "b": _make_agent("b", free_mb=6000),
+ }
+ registry = ProfileRegistry()
+ result = select_node(agents, "vllm", registry, resident_keys={"a:vllm"})
+ assert result == ("a", 0)
+
+
+def test_excludes_offline_nodes():
+ agents = {
+ "a": _make_agent("a", free_mb=8000, online=False),
+ "b": _make_agent("b", free_mb=2000, online=True),
+ }
+ registry = ProfileRegistry()
+ result = select_node(agents, "vllm", registry, resident_keys=set())
+ assert result == ("b", 0)
+
+
+def test_returns_none_when_no_node_has_profile_for_service():
+ agents = {"a": _make_agent("a", free_mb=8000)}
+ registry = ProfileRegistry()
+ result = select_node(agents, "cf-nonexistent-service", registry, resident_keys=set())
+ assert result is None
+
+
+def test_returns_none_when_no_agents():
+ registry = ProfileRegistry()
+ result = select_node({}, "vllm", registry, resident_keys=set())
+ assert result is None
diff --git a/tests/test_resources/test_service_registry.py b/tests/test_resources/test_service_registry.py
new file mode 100644
index 0000000..dc73a9c
--- /dev/null
+++ b/tests/test_resources/test_service_registry.py
@@ -0,0 +1,86 @@
+import time
+import dataclasses
+import pytest
+from circuitforge_core.resources.coordinator.service_registry import (
+ ServiceRegistry, ServiceAllocation, ServiceInstance,
+)
+
+
+@pytest.fixture
+def registry():
+ return ServiceRegistry()
+
+
+def test_allocate_creates_allocation(registry):
+ alloc = registry.allocate(
+ service="vllm", node_id="heimdall", gpu_id=0,
+ model="Ouro-1.4B", url="http://heimdall:8000",
+ caller="test", ttl_s=300.0,
+ )
+ assert alloc.service == "vllm"
+ assert alloc.node_id == "heimdall"
+ assert alloc.allocation_id # non-empty UUID string
+
+
+def test_active_allocations_count(registry):
+ registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0)
+ registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0)
+ assert registry.active_allocations("vllm", "heimdall", 0) == 2
+
+
+def test_release_decrements_count(registry):
+ alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0)
+ registry.release(alloc.allocation_id)
+ assert registry.active_allocations("vllm", "heimdall", 0) == 0
+
+
+def test_release_nonexistent_returns_false(registry):
+ assert registry.release("nonexistent-id") is False
+
+
+def test_upsert_instance_sets_running_state(registry):
+ registry.upsert_instance("vllm", "heimdall", 0, state="running",
+ model="Ouro-1.4B", url="http://heimdall:8000")
+ instances = registry.all_instances()
+ assert len(instances) == 1
+ assert instances[0].state == "running"
+
+
+def test_release_last_alloc_marks_instance_idle(registry):
+ registry.upsert_instance("vllm", "heimdall", 0, state="running",
+ model="Ouro-1.4B", url="http://heimdall:8000")
+ alloc = registry.allocate("vllm", "heimdall", 0, "Ouro-1.4B", "http://heimdall:8000", "a", 300.0)
+ registry.release(alloc.allocation_id)
+ instance = registry.all_instances()[0]
+ assert instance.state == "idle"
+ assert instance.idle_since is not None
+
+
+def test_new_alloc_on_idle_instance_marks_it_running(registry):
+ registry.upsert_instance("vllm", "heimdall", 0, state="idle",
+ model="M", url="http://h:8000")
+ registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0)
+ assert registry.all_instances()[0].state == "running"
+
+
+def test_sweep_expired_allocations(registry):
+ # Register a running instance so idle-transition logic has something to act on.
+ registry.upsert_instance("vllm", "heimdall", 0, state="running",
+ model="M", url="http://h:8000")
+ # Create an allocation with a very short TTL (1 second).
+ alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "caller", ttl_s=1)
+ assert registry.active_allocations("vllm", "heimdall", 0) == 1
+
+ # Wait for TTL to elapse.
+ time.sleep(1.1)
+
+ expired = registry.sweep_expired_allocations()
+
+ # The allocation should have been swept.
+ assert alloc.allocation_id in expired
+ assert registry.active_allocations("vllm", "heimdall", 0) == 0
+
+ # The instance should have transitioned to idle since no allocations remain.
+ instance = registry.all_instances()[0]
+ assert instance.state == "idle"
+ assert instance.idle_since is not None