Merge pull request 'feat: auto service lifecycle — /allocate, NodeSelector, idle sweep, CFOrchClient' (#9) from feature/orch-auto-lifecycle into main

This commit is contained in:
pyr0ball 2026-04-02 14:11:36 -07:00
commit c5e12b74f2
22 changed files with 1621 additions and 36 deletions

View file

@ -3,12 +3,15 @@ LLM abstraction layer with priority fallback chain.
Reads config from ~/.config/circuitforge/llm.yaml.
Tries backends in order; falls back on any error.
"""
import logging
import os
import yaml
import requests
from pathlib import Path
from openai import OpenAI
logger = logging.getLogger(__name__)
CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml"
@ -38,6 +41,33 @@ class LLMRouter:
models = client.models.list()
return models.data[0].id
def _try_cf_orch_alloc(self, backend: dict) -> "tuple | None":
"""
If backend config has a cf_orch block and CF_ORCH_URL is set (env takes
precedence over yaml url), allocate via cf-orch and return (ctx, alloc).
Returns None if not configured or allocation fails.
Caller MUST call ctx.__exit__(None, None, None) in a finally block.
"""
import os
orch_cfg = backend.get("cf_orch")
if not orch_cfg:
return None
orch_url = os.environ.get("CF_ORCH_URL", orch_cfg.get("url", ""))
if not orch_url:
return None
try:
from circuitforge_core.resources.client import CFOrchClient
client = CFOrchClient(orch_url)
service = orch_cfg.get("service", "vllm")
candidates = orch_cfg.get("model_candidates", [])
ttl_s = float(orch_cfg.get("ttl_s", 3600.0))
ctx = client.allocate(service, model_candidates=candidates, ttl_s=ttl_s, caller="llm-router")
alloc = ctx.__enter__()
return (ctx, alloc)
except Exception as exc:
logger.warning("[LLMRouter] cf_orch allocation failed, using base_url directly: %s", exc)
return None
def complete(self, prompt: str, system: str | None = None,
model_override: str | None = None,
fallback_order: list[str] | None = None,
@ -105,6 +135,12 @@ class LLMRouter:
if not self._is_reachable(backend["base_url"]):
print(f"[LLMRouter] {name}: unreachable, skipping")
continue
# --- cf_orch: optionally override base_url with coordinator-allocated URL ---
orch_ctx = orch_alloc = None
orch_result = self._try_cf_orch_alloc(backend)
if orch_result is not None:
orch_ctx, orch_alloc = orch_result
backend = {**backend, "base_url": orch_alloc.url + "/v1"}
try:
client = OpenAI(
base_url=backend["base_url"],
@ -136,6 +172,12 @@ class LLMRouter:
except Exception as e:
print(f"[LLMRouter] {name}: error — {e}, trying next")
continue
finally:
if orch_ctx is not None:
try:
orch_ctx.__exit__(None, None, None)
except Exception:
pass
elif backend["type"] == "anthropic":
api_key = os.environ.get(backend["api_key_env"], "")

View file

@ -0,0 +1 @@
from circuitforge_core.resources.client import CFOrchClient, Allocation # noqa: F401

View file

@ -44,11 +44,17 @@ def start(
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
lease_manager = LeaseManager()
profile_registry = ProfileRegistry()
supervisor = AgentSupervisor(lease_manager=lease_manager)
service_registry = ServiceRegistry()
supervisor = AgentSupervisor(
lease_manager=lease_manager,
service_registry=service_registry,
profile_registry=profile_registry,
)
monitor = GpuMonitor()
gpus = monitor.poll()
@ -79,6 +85,7 @@ def start(
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=supervisor,
service_registry=service_registry,
)
typer.echo(f"Starting cf-orch coordinator on {host}:{port}")
@ -92,6 +99,7 @@ def agent(
host: str = "0.0.0.0",
port: int = 7701,
advertise_host: Optional[str] = None,
profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None,
) -> None:
"""Start a cf-orch node agent and self-register with the coordinator.
@ -101,10 +109,11 @@ def agent(
Use --advertise-host to override the IP the coordinator should use to
reach this agent (e.g. on a multi-homed or NATted host).
"""
import asyncio
import threading
import httpx
from circuitforge_core.resources.agent.app import create_agent_app
from circuitforge_core.resources.agent.service_manager import ServiceManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
# The URL the coordinator should use to reach this agent.
reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host)
@ -132,7 +141,18 @@ def agent(
# Fire registration in a daemon thread so uvicorn.run() can start blocking immediately.
threading.Thread(target=_register_in_background, daemon=True).start()
agent_app = create_agent_app(node_id=node_id)
service_manager = None
try:
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
pr = ProfileRegistry()
gpus = GpuMonitor().poll()
p = pr.load(Path(profile)) if profile else pr.auto_detect(gpus)
service_manager = ServiceManager(node_id=node_id, profile=p, advertise_host=reach_host)
typer.echo(f"ServiceManager ready with profile: {p.name}")
except Exception as exc:
typer.echo(f"Warning: ServiceManager unavailable ({exc})", err=True)
agent_app = create_agent_app(node_id=node_id, service_manager=service_manager)
typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}")
uvicorn.run(agent_app, host=host, port=port)

View file

@ -0,0 +1,126 @@
from __future__ import annotations
import logging
from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass
import httpx
logger = logging.getLogger(__name__)
@dataclass
class Allocation:
allocation_id: str
service: str
node_id: str
gpu_id: int
model: str | None
url: str
started: bool
warm: bool
class CFOrchClient:
"""
Client for cf-orch coordinator allocation.
Sync usage (in LLMRouter or other sync code):
client = CFOrchClient(os.environ["CF_ORCH_URL"])
with client.allocate("vllm", model_candidates=["Ouro-1.4B"]) as alloc:
# alloc.url is the inference endpoint
Async usage (in FastAPI apps):
async with client.allocate_async("vllm", model_candidates=["Ouro-1.4B"]) as alloc:
...
Raises ValueError immediately if coordinator_url is empty.
"""
def __init__(self, coordinator_url: str) -> None:
if not coordinator_url:
raise ValueError("coordinator_url is empty — cf-orch not configured")
self._url = coordinator_url.rstrip("/")
def _build_body(self, model_candidates: list[str] | None, ttl_s: float, caller: str) -> dict:
return {
"model_candidates": model_candidates or [],
"ttl_s": ttl_s,
"caller": caller,
}
def _parse_allocation(self, data: dict, service: str) -> Allocation:
return Allocation(
allocation_id=data["allocation_id"],
service=service,
node_id=data["node_id"],
gpu_id=data["gpu_id"],
model=data.get("model"),
url=data["url"],
started=data.get("started", False),
warm=data.get("warm", False),
)
@contextmanager
def allocate(
self,
service: str,
*,
model_candidates: list[str] | None = None,
ttl_s: float = 3600.0,
caller: str = "",
):
"""Sync context manager. Allocates on enter, releases on exit."""
resp = httpx.post(
f"{self._url}/api/services/{service}/allocate",
json=self._build_body(model_candidates, ttl_s, caller),
timeout=120.0,
)
if not resp.is_success:
raise RuntimeError(
f"cf-orch allocation failed for {service!r}: "
f"HTTP {resp.status_code}{resp.text[:200]}"
)
alloc = self._parse_allocation(resp.json(), service)
try:
yield alloc
finally:
try:
httpx.delete(
f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}",
timeout=10.0,
)
except Exception as exc:
logger.debug("cf-orch release failed (non-fatal): %s", exc)
@asynccontextmanager
async def allocate_async(
self,
service: str,
*,
model_candidates: list[str] | None = None,
ttl_s: float = 3600.0,
caller: str = "",
):
"""Async context manager. Allocates on enter, releases on exit."""
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{self._url}/api/services/{service}/allocate",
json=self._build_body(model_candidates, ttl_s, caller),
)
if not resp.is_success:
raise RuntimeError(
f"cf-orch allocation failed for {service!r}: "
f"HTTP {resp.status_code}{resp.text[:200]}"
)
alloc = self._parse_allocation(resp.json(), service)
try:
yield alloc
finally:
try:
await client.delete(
f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}",
timeout=10.0,
)
except Exception as exc:
logger.debug("cf-orch async release failed (non-fatal): %s", exc)

View file

@ -8,7 +8,9 @@ from dataclasses import dataclass, field
import httpx
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.models import GpuInfo, NodeInfo
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation
logger = logging.getLogger(__name__)
@ -26,15 +28,27 @@ class AgentRecord:
class AgentSupervisor:
def __init__(self, lease_manager: LeaseManager) -> None:
def __init__(
self,
lease_manager: LeaseManager,
service_registry: ServiceRegistry | None = None,
profile_registry: ProfileRegistry | None = None,
) -> None:
self._agents: dict[str, AgentRecord] = {}
self._lease_manager = lease_manager
self._running = False
self._service_registry = service_registry
self._profile_registry = profile_registry
self._heartbeat_tick = 0
def register(self, node_id: str, agent_url: str) -> None:
if node_id not in self._agents:
self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url)
logger.info("Registered agent node: %s @ %s", node_id, agent_url)
else:
if self._agents[node_id].agent_url != agent_url:
self._agents[node_id].agent_url = agent_url
logger.info("Updated agent URL for %s%s", node_id, agent_url)
def get_node_info(self, node_id: str) -> NodeInfo | None:
record = self._agents.get(node_id)
@ -58,15 +72,27 @@ class AgentSupervisor:
for r in self._agents.values()
]
def online_agents(self) -> "dict[str, AgentRecord]":
"""Return only currently-online agents, keyed by node_id."""
return {nid: rec for nid, rec in self._agents.items() if rec.online}
async def poll_agent(self, node_id: str) -> bool:
record = self._agents.get(node_id)
if record is None:
return False
try:
async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client:
resp = await client.get(f"{record.agent_url}/gpu-info")
resp.raise_for_status()
data = resp.json()
gpu_resp = await client.get(f"{record.agent_url}/gpu-info")
gpu_resp.raise_for_status()
# Resident-info is best-effort — older agents may not have the endpoint.
try:
res_resp = await client.get(f"{record.agent_url}/resident-info")
resident_data = res_resp.json() if res_resp.is_success else {}
except Exception:
resident_data = {}
data = gpu_resp.json()
gpus = [
GpuInfo(
gpu_id=g["gpu_id"],
@ -82,6 +108,13 @@ class AgentSupervisor:
record.online = True
for gpu in gpus:
self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb)
residents = [
(r["service"], r.get("model_name"))
for r in resident_data.get("residents", [])
]
self._lease_manager.set_residents_for_node(node_id, residents)
return True
except Exception as exc:
logger.warning("Agent %s unreachable: %s", node_id, exc)
@ -91,10 +124,58 @@ class AgentSupervisor:
async def poll_all(self) -> None:
await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents])
def _build_idle_stop_config(self) -> dict[str, int]:
if self._profile_registry is None:
return {}
config: dict[str, int] = {}
for profile in self._profile_registry.list_public():
for svc_name, svc in profile.services.items():
if svc.idle_stop_after_s > 0:
existing = config.get(svc_name, 0)
config[svc_name] = min(existing, svc.idle_stop_after_s) if existing > 0 else svc.idle_stop_after_s
return config
async def _http_post(self, url: str) -> bool:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(url)
return resp.is_success
except Exception as exc:
logger.warning("HTTP POST %s failed: %s", url, exc)
return False
async def _run_idle_sweep(self) -> None:
if self._service_registry is None:
return
expired = self._service_registry.sweep_expired_allocations()
if expired:
logger.info("TTL sweep: expired %d allocation(s): %s", len(expired), expired)
idle_stop_config = self._build_idle_stop_config()
if not idle_stop_config:
return
timed_out = self._service_registry.idle_past_timeout(idle_stop_config)
for instance in timed_out:
node_info = self.get_node_info(instance.node_id)
if node_info is None:
continue
stop_url = f"{node_info.agent_url}/services/{instance.service}/stop"
logger.info(
"Idle sweep: stopping %s on %s gpu%s (idle timeout)",
instance.service, instance.node_id, instance.gpu_id,
)
success = await self._http_post(stop_url)
if success:
self._service_registry.mark_stopped(
instance.service, instance.node_id, instance.gpu_id
)
async def run_heartbeat_loop(self) -> None:
self._running = True
while self._running:
await self.poll_all()
self._heartbeat_tick += 1
if self._heartbeat_tick % 3 == 0:
await self._run_idle_sweep()
await asyncio.sleep(_HEARTBEAT_INTERVAL_S)
def stop(self) -> None:

View file

@ -11,7 +11,9 @@ from pydantic import BaseModel
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.node_selector import select_node
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
@ -30,10 +32,29 @@ class NodeRegisterRequest(BaseModel):
agent_url: str # e.g. "http://10.1.10.71:7701"
class ServiceEnsureRequest(BaseModel):
node_id: str
gpu_id: int = 0
params: dict[str, str] = {}
ttl_s: float = 3600.0
# Ordered list of model names to try; falls back down the list if VRAM is tight.
# The "model" key in params is used if this list is empty.
model_candidates: list[str] = []
class ServiceAllocateRequest(BaseModel):
model_candidates: list[str] = []
gpu_id: int | None = None
params: dict[str, str] = {}
ttl_s: float = 3600.0
caller: str = ""
def create_coordinator_app(
lease_manager: LeaseManager,
profile_registry: ProfileRegistry,
agent_supervisor: AgentSupervisor,
service_registry: ServiceRegistry,
) -> FastAPI:
eviction_engine = EvictionEngine(lease_manager=lease_manager)
@ -95,6 +116,20 @@ def create_coordinator_app(
]
}
@app.get("/api/resident")
def get_residents() -> dict[str, Any]:
return {
"residents": [
{
"service": r.service,
"node_id": r.node_id,
"model_name": r.model_name,
"first_seen": r.first_seen,
}
for r in lease_manager.all_residents()
]
}
@app.get("/api/leases")
def get_leases() -> dict[str, Any]:
return {
@ -155,4 +190,232 @@ def create_coordinator_app(
raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found")
return {"released": True, "lease_id": lease_id}
@app.post("/api/services/{service}/ensure")
async def ensure_service(service: str, req: ServiceEnsureRequest) -> dict[str, Any]:
"""
Ensure a managed service is running on the given node.
If model_candidates is provided, tries each model in order, skipping any
that exceed the live free VRAM on the target GPU. Falls back down the list
until one succeeds. The selected model is returned in the response.
"""
import httpx
node_info = agent_supervisor.get_node_info(req.node_id)
if node_info is None:
raise HTTPException(422, detail=f"Unknown node_id {req.node_id!r}")
# Resolve candidate list — fall back to params["model"] if not specified.
candidates: list[str] = req.model_candidates or (
[req.params["model"]] if "model" in req.params else []
)
if not candidates:
raise HTTPException(422, detail="No model specified: set params.model or model_candidates")
# Live free VRAM on the target GPU (used for pre-flight filtering).
gpu = next((g for g in node_info.gpus if g.gpu_id == req.gpu_id), None)
free_mb = gpu.vram_free_mb if gpu else 0
# Profile max_mb for the service gives us the VRAM ceiling for this slot.
# Models larger than free_mb are skipped before we even try to start them.
# We use model file size as a rough proxy — skip if free_mb < half of max_mb,
# since a fully-loaded model typically needs ~50-80% of its param size in VRAM.
service_max_mb = 0
for p in profile_registry.list_public():
svc = p.services.get(service)
if svc:
service_max_mb = svc.max_mb
break
# Filter candidates by VRAM headroom — skip models where free VRAM
# is less than half of the service's max_mb ceiling.
if service_max_mb > 0 and free_mb < service_max_mb // 2:
raise HTTPException(
503,
detail=f"Insufficient VRAM on gpu {req.gpu_id}: {free_mb}MB free, need at least {service_max_mb // 2}MB",
)
last_error: str = ""
async with httpx.AsyncClient(timeout=120.0) as client:
for model in candidates:
params_with_model = {**req.params, "model": model}
try:
start_resp = await client.post(
f"{node_info.agent_url}/services/{service}/start",
json={"gpu_id": req.gpu_id, "params": params_with_model},
)
if start_resp.is_success:
data = start_resp.json()
return {
"service": service,
"node_id": req.node_id,
"gpu_id": req.gpu_id,
"model": model,
"url": data.get("url"),
"running": data.get("running", False),
}
last_error = start_resp.text
except httpx.HTTPError as exc:
raise HTTPException(502, detail=f"Agent unreachable: {exc}")
raise HTTPException(
503,
detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}",
)
@app.post("/api/services/{service}/allocate")
async def allocate_service(service: str, req: ServiceAllocateRequest) -> dict[str, Any]:
"""
Allocate a managed service coordinator picks the best node automatically.
Returns a URL + allocation_id. (Allocation not tracked server-side until Phase 2.)
"""
import httpx
if not req.model_candidates:
raise HTTPException(422, detail="model_candidates must be non-empty")
# Validate service is known in at least one profile, regardless of gpu_id
if not any(service in p.services for p in profile_registry.list_public()):
raise HTTPException(422, detail=f"Unknown service {service!r} — not in any profile")
residents = lease_manager.resident_keys()
if req.gpu_id is None:
online = agent_supervisor.online_agents()
placement = select_node(online, service, profile_registry, residents)
if placement is None:
raise HTTPException(
503,
detail=f"No online node has capacity for service {service!r}",
)
node_id, gpu_id = placement
else:
online = agent_supervisor.online_agents()
node_id = next(
(nid for nid, rec in online.items()
if any(g.gpu_id == req.gpu_id for g in rec.gpus)),
None,
)
if node_id is None:
raise HTTPException(422, detail=f"No online node has gpu_id={req.gpu_id}")
gpu_id = req.gpu_id
node_info = agent_supervisor.get_node_info(node_id)
if node_info is None:
raise HTTPException(422, detail=f"Node {node_id!r} not found")
warm = f"{node_id}:{service}" in residents
async with httpx.AsyncClient(timeout=120.0) as client:
last_error = ""
for model in req.model_candidates:
try:
resp = await client.post(
f"{node_info.agent_url}/services/{service}/start",
json={"gpu_id": gpu_id, "params": {**req.params, "model": model}},
)
if resp.is_success:
data = resp.json()
alloc = service_registry.allocate(
service=service,
node_id=node_id,
gpu_id=gpu_id,
model=model,
caller=req.caller,
url=data.get("url", ""),
ttl_s=req.ttl_s,
)
return {
"allocation_id": alloc.allocation_id,
"service": service,
"node_id": node_id,
"gpu_id": gpu_id,
"model": model,
"url": data.get("url"),
"started": not warm,
"warm": warm,
}
last_error = resp.text
except httpx.HTTPError as exc:
raise HTTPException(502, detail=f"Agent unreachable: {exc}")
raise HTTPException(
503,
detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}",
)
@app.delete("/api/services/{service}/allocations/{allocation_id}")
async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]:
existing = service_registry.get_allocation(allocation_id)
if existing is None or existing.service != service:
raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found for service {service!r}")
released = service_registry.release(allocation_id)
if not released:
raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found")
return {"released": True, "allocation_id": allocation_id}
@app.get("/api/services/{service}/status")
def get_service_status(service: str) -> dict[str, Any]:
instances = [i for i in service_registry.all_instances() if i.service == service]
allocations = [a for a in service_registry.all_allocations() if a.service == service]
return {
"service": service,
"instances": [
{
"node_id": i.node_id,
"gpu_id": i.gpu_id,
"state": i.state,
"model": i.model,
"url": i.url,
"idle_since": i.idle_since,
}
for i in instances
],
"allocations": [
{
"allocation_id": a.allocation_id,
"node_id": a.node_id,
"gpu_id": a.gpu_id,
"model": a.model,
"caller": a.caller,
"url": a.url,
"expires_at": a.expires_at,
}
for a in allocations
],
}
@app.get("/api/services")
def list_services() -> dict[str, Any]:
instances = service_registry.all_instances()
return {
"services": [
{
"service": i.service,
"node_id": i.node_id,
"gpu_id": i.gpu_id,
"state": i.state,
"model": i.model,
"url": i.url,
}
for i in instances
]
}
@app.delete("/api/services/{service}")
async def stop_service(service: str, node_id: str) -> dict[str, Any]:
"""Stop a managed service on the given node."""
node_info = agent_supervisor.get_node_info(node_id)
if node_info is None:
raise HTTPException(422, detail=f"Unknown node_id {node_id!r}")
import httpx
async with httpx.AsyncClient(timeout=30.0) as client:
try:
resp = await client.post(f"{node_info.agent_url}/services/{service}/stop")
resp.raise_for_status()
return {"service": service, "node_id": node_id, "stopped": resp.json().get("stopped", False)}
except httpx.HTTPError as exc:
raise HTTPException(502, detail=f"Agent unreachable: {exc}")
return app

View file

@ -52,8 +52,9 @@
.gpu-node { font-size: 0.75em; font-weight: 700; color: var(--indigo); margin-bottom: 1px; }
.gpu-offline .gpu-node { color: var(--orange); }
.gpu-name { font-size: 0.78em; color: var(--text); margin-bottom: 0.4rem; }
.vram-track { background: var(--bg); border-radius: var(--radius-sm); height: 6px; margin-bottom: 0.3rem; }
.vram-fill { height: 100%; border-radius: var(--radius-sm); transition: width 0.4s; }
.vram-track { position: relative; background: var(--bg); border-radius: var(--radius-sm); height: 6px; margin-bottom: 0.3rem; overflow: hidden; }
.vram-leased { position: absolute; left: 0; top: 0; height: 100%; background: var(--cyan); transition: width 0.4s; }
.vram-resident { position: absolute; top: 0; height: 100%; background: var(--amber); transition: left 0.4s, width 0.4s; }
.vram-label { font-size: 0.72em; color: var(--muted); margin-bottom: 0.25rem; }
.gpu-status { font-size: 0.72em; }
.gpu-status.idle { color: var(--green); }
@ -62,21 +63,30 @@
.gpu-status.offline { color: var(--orange); }
.spark-track { height: 24px; background: var(--bg); border-radius: var(--radius-sm); margin-top: 0.4rem; overflow: hidden; }
/* leases */
#leases-table { width: 100%; border-collapse: collapse; background: var(--bg2); border: 1px solid var(--border); border-radius: var(--radius); overflow: hidden; margin-bottom: 1rem; }
#leases-table th { background: var(--bg3); color: var(--dim); font-size: 0.72em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; padding: 0.4rem 0.6rem; text-align: left; border-bottom: 1px solid var(--border); }
#leases-table td { padding: 0.35rem 0.6rem; border-bottom: 1px solid var(--border-dim); font-size: 0.8em; vertical-align: middle; }
#leases-table tr:last-child td { border-bottom: none; }
/* shared table base */
.cf-table { width: 100%; border-collapse: collapse; background: var(--bg2); border: 1px solid var(--border); border-radius: var(--radius); overflow: hidden; margin-bottom: 1rem; }
.cf-table th { background: var(--bg3); color: var(--dim); font-size: 0.72em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; padding: 0.4rem 0.6rem; text-align: left; border-bottom: 1px solid var(--border); }
.cf-table td { padding: 0.35rem 0.6rem; border-bottom: 1px solid var(--border-dim); font-size: 0.8em; vertical-align: middle; }
.cf-table tr:last-child td { border-bottom: none; }
.td-service { color: var(--indigo); font-weight: 600; }
.td-node { color: var(--muted); }
.td-mb { color: var(--text); }
.td-priority { color: var(--amber); }
.td-model { color: var(--cyan); font-size: 0.75em; }
.td-warm { color: var(--amber); }
.td-none { color: var(--dim); font-style: italic; }
.ttl-wrap { display: flex; align-items: center; gap: 0.5rem; }
.ttl-label { color: var(--cyan); font-variant-numeric: tabular-nums; white-space: nowrap; }
.ttl-track { flex: 1; background: var(--bg); border-radius: var(--radius-sm); height: 4px; }
.ttl-fill { height: 100%; border-radius: var(--radius-sm); background: var(--cyan); transition: width 0.4s; }
/* service state classes */
.state-running { color: #2ecc40; }
.state-idle { color: #ff851b; }
.state-stopped { color: #aaa; }
.state-starting { color: #0074d9; }
.state-unknown { color: #ff4136; }
/* error */
#error-banner { display: none; background: rgba(248,81,73,.1); border: 1px solid var(--red); border-radius: var(--radius); color: var(--red); padding: 0.5rem 0.75rem; font-size: 0.82em; margin-bottom: 1rem; }
@ -102,8 +112,20 @@
<div class="section-label">GPU Nodes</div>
<div id="gpu-grid"></div>
<div id="services-section">
<div class="section-label">Service Instances</div>
<table class="cf-table" id="services-table">
<thead>
<tr>
<th>Service</th><th>Node</th><th>GPU</th><th>State</th><th>Model</th><th>URL</th>
</tr>
</thead>
<tbody id="services-body"></tbody>
</table>
</div>
<div class="section-label">Active Leases</div>
<table id="leases-table">
<table class="cf-table" id="leases-table">
<thead>
<tr>
<th>Service</th><th>Node / GPU</th><th>VRAM</th><th>Priority</th><th>TTL / Expires</th>
@ -112,10 +134,22 @@
<tbody id="leases-body"></tbody>
</table>
<div class="section-label">Warm Models</div>
<table class="cf-table" id="resident-table">
<thead>
<tr>
<th>Service</th><th>Node</th><th>Model</th><th>Warm Since</th>
</tr>
</thead>
<tbody id="resident-body"></tbody>
</table>
<footer>
<span>cf-orch · circuitforge-core</span>
<a href="/api/nodes" target="_blank">/api/nodes</a>
<a href="/api/leases" target="_blank">/api/leases</a>
<a href="/api/resident" target="_blank">/api/resident</a>
<a href="/api/services" target="_blank">/api/services</a>
<a href="/api/health" target="_blank">/api/health</a>
</footer>
@ -198,6 +232,41 @@ setInterval(() => {
document.getElementById('countdown').textContent = countdown;
}, 1000);
// ── state class helper ───────────────────────────────────────────
function stateClass(state) {
const map = { running: 'state-running', idle: 'state-idle', stopped: 'state-stopped', starting: 'state-starting' };
return map[state] || 'state-unknown';
}
// ── render: services table ───────────────────────────────────────
function renderServices(services) {
const tbody = document.getElementById('services-body');
if (!services || services.length === 0) {
const tr = document.createElement('tr');
const td = el('td', { cls: 'td-none', text: 'No service instances registered.' });
td.setAttribute('colspan', '6');
tr.appendChild(td);
setChildren(tbody, tr);
return;
}
const rows = services.map(svc => {
const tr = document.createElement('tr');
const fields = [
{ text: svc.service, cls: 'td-service' },
{ text: svc.node_id, cls: 'td-node' },
{ text: String(svc.gpu_id), cls: 'td-mb' },
{ text: svc.state, cls: stateClass(svc.state) },
{ text: svc.model || '\u2014', cls: 'td-model' },
{ text: svc.url || '\u2014', cls: 'td-node' },
];
fields.forEach(f => tr.appendChild(el('td', { cls: f.cls, text: f.text })));
return tr;
});
setChildren(tbody, ...rows);
}
// ── render: health strip ─────────────────────────────────────────
function renderHealth(ok) {
const strip = document.getElementById('health-strip');
@ -206,7 +275,8 @@ function renderHealth(ok) {
}
// ── render: GPU grid ─────────────────────────────────────────────
function renderNodes(nodes) {
// leasedByGpu: "nodeId:gpuId" → total MB currently leased (from active leases)
function renderNodes(nodes, leasedByGpu) {
const grid = document.getElementById('gpu-grid');
if (!nodes || nodes.length === 0) {
setChildren(grid, el('div', { text: 'No nodes registered.', style: { color: 'var(--dim)', fontSize: '0.8em', padding: '0.5rem' } }));
@ -216,33 +286,46 @@ function renderNodes(nodes) {
const cards = [];
for (const node of nodes) {
for (const gpu of node.gpus) {
const key = node.node_id + ':' + gpu.gpu_id;
const pct = gpu.vram_total_mb > 0 ? gpu.vram_used_mb / gpu.vram_total_mb : 0;
const usedGb = (gpu.vram_used_mb / 1024).toFixed(1);
const totalGb = (gpu.vram_total_mb / 1024).toFixed(1);
const color = vramColor(pct);
const key = node.node_id + ':' + gpu.gpu_id;
const total = gpu.vram_total_mb || 1;
const used = gpu.vram_used_mb;
const leased = leasedByGpu[key] || 0;
// Resident = nvidia-smi used minus actively leased; clamped to [0, used].
const resident = Math.max(0, Math.min(used - leased, used));
const pct = used / total;
if (!sparkHistory[key]) sparkHistory[key] = [];
sparkHistory[key].push(gpu.vram_used_mb);
sparkHistory[key].push(used);
if (sparkHistory[key].length > 20) sparkHistory[key].shift();
const statusCls = pct >= 0.9 ? 'full' : pct >= 0.1 ? 'busy' : 'idle';
const statusText = pct >= 0.9 ? 'saturated' : pct >= 0.1 ? Math.round(pct * 100) + '% used' : 'idle';
const card = el('div', { cls: 'gpu-card' });
const card = el('div', { cls: 'gpu-card' });
const nodeLabel = el('div', { cls: 'gpu-node', text: node.node_id.toUpperCase() + ' · GPU ' + gpu.gpu_id });
const nameLine = el('div', { cls: 'gpu-name', text: gpu.name || 'Unknown GPU' });
const track = el('div', { cls: 'vram-track' });
const fill = el('div', { cls: 'vram-fill', style: { width: (pct * 100).toFixed(1) + '%', background: color } });
track.appendChild(fill);
// Stacked bar: cyan (leased) → amber (resident) → dark bg (free).
const leasedPct = (leased / total * 100).toFixed(1);
const residentPct = (resident / total * 100).toFixed(1);
const track = el('div', { cls: 'vram-track' });
const fillLeased = el('div', { cls: 'vram-leased', style: { width: leasedPct + '%' } });
const fillResident = el('div', { cls: 'vram-resident', style: { left: leasedPct + '%', width: residentPct + '%' } });
append(track, fillLeased, fillResident);
const vramLbl = el('div', { cls: 'vram-label', text: usedGb + ' / ' + totalGb + ' GB' });
const statusEl = el('div', { cls: 'gpu-status ' + statusCls, text: statusText });
// Breakdown label when something is allocated.
let labelText = (used / 1024).toFixed(1) + ' / ' + (total / 1024).toFixed(1) + ' GB';
if (leased > 0 || resident > 0) {
const parts = [];
if (leased > 0) parts.push((leased / 1024).toFixed(1) + 'G leased');
if (resident > 0) parts.push((resident / 1024).toFixed(1) + 'G resident');
labelText += ' (' + parts.join(' · ') + ')';
}
const vramLbl = el('div', { cls: 'vram-label', text: labelText });
const statusEl = el('div', { cls: 'gpu-status ' + statusCls, text: statusText });
const sparkTrack = el('div', { cls: 'spark-track' });
sparkTrack.appendChild(buildSparkline(sparkHistory[key], gpu.vram_total_mb));
sparkTrack.appendChild(buildSparkline(sparkHistory[key], total));
append(card, nodeLabel, nameLine, track, vramLbl, statusEl, sparkTrack);
cards.push(card);
@ -252,6 +335,40 @@ function renderNodes(nodes) {
setChildren(grid, ...cards);
}
// ── render: warm models table ────────────────────────────────────
function renderResidents(residents) {
const tbody = document.getElementById('resident-body');
if (!residents || residents.length === 0) {
const tr = document.createElement('tr');
const td = el('td', { cls: 'td-none', text: 'No warm models detected.' });
td.setAttribute('colspan', '4');
tr.appendChild(td);
setChildren(tbody, tr);
return;
}
const now = Date.now() / 1000;
const rows = residents.map(r => {
const warmSecs = now - (r.first_seen || now);
const warmText = warmSecs < 60
? Math.floor(warmSecs) + 's'
: warmSecs < 3600
? Math.floor(warmSecs / 60) + 'm ' + String(Math.floor(warmSecs % 60)).padStart(2, '0') + 's'
: Math.floor(warmSecs / 3600) + 'h ' + String(Math.floor((warmSecs % 3600) / 60)).padStart(2, '0') + 'm';
const tr = document.createElement('tr');
append(tr,
el('td', { cls: 'td-service', text: r.service }),
el('td', { cls: 'td-node', text: r.node_id }),
el('td', { cls: 'td-model', text: r.model_name || '—' }),
el('td', { cls: 'td-warm', text: warmText }),
);
return tr;
});
setChildren(tbody, ...rows);
}
// ── render: leases table ─────────────────────────────────────────
function renderLeases(leases) {
const tbody = document.getElementById('leases-body');
@ -316,17 +433,33 @@ function clearError() { document.getElementById('error-banner').style.display =
// ── poll ─────────────────────────────────────────────────────────
async function poll() {
try {
const [nodesRes, leasesRes, healthRes] = await Promise.all([
const [nodesRes, leasesRes, residentRes, healthRes, servicesRes] = await Promise.all([
fetch('/api/nodes'),
fetch('/api/leases'),
fetch('/api/resident'),
fetch('/api/health'),
fetch('/api/services'),
]);
if (!nodesRes.ok || !leasesRes.ok) throw new Error('API error: ' + nodesRes.status);
const [nodesData, leasesData] = await Promise.all([nodesRes.json(), leasesRes.json()]);
const [nodesData, leasesData, residentData, servicesData] = await Promise.all([
nodesRes.json(), leasesRes.json(),
residentRes.ok ? residentRes.json() : Promise.resolve({ residents: [] }),
servicesRes.ok ? servicesRes.json() : Promise.resolve({ services: [] }),
]);
// Build per-GPU leased-MB index for the stacked bar.
const leasedByGpu = {};
for (const lease of (leasesData.leases || [])) {
const key = lease.node_id + ':' + lease.gpu_id;
leasedByGpu[key] = (leasedByGpu[key] || 0) + lease.mb_granted;
}
clearError();
renderHealth(healthRes.ok);
renderNodes(nodesData.nodes || []);
renderNodes(nodesData.nodes || [], leasedByGpu);
renderServices(servicesData.services || []);
renderLeases(leasesData.leases || []);
renderResidents(residentData.residents || []);
} catch (err) {
showError('Failed to reach coordinator: ' + err.message);
renderHealth(false);

View file

@ -3,7 +3,7 @@ from __future__ import annotations
import asyncio
from collections import defaultdict
from circuitforge_core.resources.models import VRAMLease
from circuitforge_core.resources.models import ResidentAllocation, VRAMLease
class LeaseManager:
@ -12,6 +12,9 @@ class LeaseManager:
self._gpu_total: dict[tuple[str, int], int] = {}
self._gpu_used: dict[tuple[str, int], int] = defaultdict(int)
self._lock = asyncio.Lock()
# Resident allocations — keyed "node_id:service", updated by heartbeat.
# No lock needed: only the single heartbeat task writes this dict.
self._residents: dict[str, ResidentAllocation] = {}
def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None:
self._gpu_total[(node_id, gpu_id)] = total_mb
@ -86,3 +89,42 @@ class LeaseManager:
def all_leases(self) -> list[VRAMLease]:
return list(self._leases.values())
# ── resident tracking ────────────────────────────────────────────
def set_residents_for_node(
self,
node_id: str,
residents: list[tuple[str, str | None]], # (service, model_name)
) -> None:
"""
Replace the resident snapshot for a node.
Preserves first_seen for entries whose service+model_name are unchanged,
so the dashboard can show how long a model has been warm.
"""
new_keys = {f"{node_id}:{service}" for service, _ in residents}
# Remove stale entries (service no longer running on this node).
for key in list(self._residents):
if key.startswith(f"{node_id}:") and key not in new_keys:
del self._residents[key]
# Upsert: preserve first_seen when model is unchanged, reset otherwise.
for service, model_name in residents:
key = f"{node_id}:{service}"
existing = self._residents.get(key)
if existing is not None and existing.model_name == model_name:
continue # same model still loaded — keep original first_seen
self._residents[key] = ResidentAllocation(
service=service,
node_id=node_id,
model_name=model_name,
)
def all_residents(self) -> list[ResidentAllocation]:
return list(self._residents.values())
def resident_keys(self) -> set[str]:
"""Return set of 'node_id:service' strings for currently-warm services."""
return set(self._residents.keys())

View file

@ -0,0 +1,74 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
_WARM_BONUS_MB = 1000
@dataclass(frozen=True)
class _Scored:
node_id: str
gpu_id: int
vram_free_mb: int
effective_free_mb: int
can_fit: bool
warm: bool
def select_node(
agents: "dict[str, AgentRecord]",
service: str,
profile_registry: "ProfileRegistry",
resident_keys: set[str],
) -> tuple[str, int] | None:
"""
Pick the best (node_id, gpu_id) for the requested service.
Warm nodes (service already running) get priority, then sorted by free VRAM.
Returns None if no suitable node exists.
"""
service_max_mb = _find_service_max_mb(service, profile_registry)
if service_max_mb is None:
return None # service not in any profile
candidates: list[_Scored] = []
for node_id, record in agents.items():
if not record.online:
continue
for gpu in record.gpus:
warm = f"{node_id}:{service}" in resident_keys
effective = gpu.vram_free_mb + (_WARM_BONUS_MB if warm else 0)
can_fit = gpu.vram_free_mb >= service_max_mb // 2
candidates.append(_Scored(
node_id=node_id,
gpu_id=gpu.gpu_id,
vram_free_mb=gpu.vram_free_mb,
effective_free_mb=effective,
can_fit=can_fit,
warm=warm,
))
if not candidates:
return None
# Prefer: (1) warm nodes (model already resident — no cold start)
# (2) cold nodes that can fit the service (free >= half of max_mb)
# Fallback: best-effort node when nothing fits and nothing is warm
# (coordinator will attempt to start the service anyway; it may evict or fail)
# Note: resident_keys are per-node, not per-GPU. On multi-GPU nodes, the warm
# bonus applies to all GPUs on the node. This is a known coarseness —
# per-GPU resident tracking requires a resident_key format change.
preferred = [c for c in candidates if c.warm or c.can_fit]
pool = preferred if preferred else candidates
best = max(pool, key=lambda c: (c.warm, c.effective_free_mb))
return best.node_id, best.gpu_id
def _find_service_max_mb(service: str, profile_registry: "ProfileRegistry") -> int | None:
for profile in profile_registry.list_public():
svc = profile.services.get(service)
if svc is not None:
return svc.max_mb
return None

View file

@ -0,0 +1,170 @@
from __future__ import annotations
import dataclasses
import time
import uuid
from dataclasses import dataclass
from typing import Literal
@dataclass
class ServiceAllocation:
allocation_id: str
service: str
node_id: str
gpu_id: int
model: str | None
caller: str
url: str
created_at: float
expires_at: float # 0 = no expiry
@dataclass
class ServiceInstance:
service: str
node_id: str
gpu_id: int
state: Literal["starting", "running", "idle", "stopped"]
model: str | None
url: str | None
idle_since: float | None = None
class ServiceRegistry:
"""
In-memory registry of service allocations and instance state.
Allocations: per-caller request many per service instance.
Instances: per (service, node_id, gpu_id) one per running container.
"""
def __init__(self) -> None:
self._allocations: dict[str, ServiceAllocation] = {}
self._instances: dict[str, ServiceInstance] = {} # key: "service:node_id:gpu_id"
# ── allocation API ────────────────────────────────────────────────
def allocate(
self,
service: str,
node_id: str,
gpu_id: int,
model: str | None,
url: str,
caller: str,
ttl_s: float,
) -> ServiceAllocation:
alloc = ServiceAllocation(
allocation_id=str(uuid.uuid4()),
service=service,
node_id=node_id,
gpu_id=gpu_id,
model=model,
caller=caller,
url=url,
created_at=time.time(),
expires_at=time.time() + ttl_s if ttl_s > 0 else 0.0,
)
self._allocations[alloc.allocation_id] = alloc
# If an instance exists in idle/stopped state, mark it running again
key = f"{service}:{node_id}:{gpu_id}"
if key in self._instances:
inst = self._instances[key]
if inst.state in ("idle", "stopped"):
self._instances[key] = dataclasses.replace(
inst, state="running", idle_since=None
)
return alloc
def release(self, allocation_id: str) -> bool:
alloc = self._allocations.pop(allocation_id, None)
if alloc is None:
return False
# If no active allocations remain for this instance, mark it idle
key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}"
if self.active_allocations(alloc.service, alloc.node_id, alloc.gpu_id) == 0:
if key in self._instances:
self._instances[key] = dataclasses.replace(
self._instances[key], state="idle", idle_since=time.time()
)
return True
def active_allocations(self, service: str, node_id: str, gpu_id: int) -> int:
return sum(
1 for a in self._allocations.values()
if a.service == service and a.node_id == node_id and a.gpu_id == gpu_id
)
# ── instance API ─────────────────────────────────────────────────
def upsert_instance(
self,
service: str,
node_id: str,
gpu_id: int,
state: Literal["starting", "running", "idle", "stopped"],
model: str | None,
url: str | None,
) -> ServiceInstance:
key = f"{service}:{node_id}:{gpu_id}"
existing = self._instances.get(key)
idle_since: float | None = None
if state == "idle":
# Preserve idle_since if already idle; set now if transitioning into idle
idle_since = existing.idle_since if (existing and existing.state == "idle") else time.time()
inst = ServiceInstance(
service=service, node_id=node_id, gpu_id=gpu_id,
state=state, model=model, url=url, idle_since=idle_since,
)
self._instances[key] = inst
return inst
def get_allocation(self, allocation_id: str) -> ServiceAllocation | None:
return self._allocations.get(allocation_id)
def sweep_expired_allocations(self) -> list[str]:
"""
Remove all allocations whose TTL has elapsed and transition the
corresponding instance to 'idle' if no active allocations remain.
Returns the list of expired allocation_ids.
"""
now = time.time()
expired = [
alloc_id
for alloc_id, alloc in self._allocations.items()
if alloc.expires_at > 0 and now > alloc.expires_at
]
for alloc_id in expired:
self.release(alloc_id)
return expired
def all_allocations(self) -> list[ServiceAllocation]:
return list(self._allocations.values())
def all_instances(self) -> list[ServiceInstance]:
return list(self._instances.values())
def mark_stopped(self, service: str, node_id: str, gpu_id: int) -> None:
"""Transition an instance to 'stopped' state and clear idle_since."""
key = f"{service}:{node_id}:{gpu_id}"
if key in self._instances:
self._instances[key] = dataclasses.replace(
self._instances[key], state="stopped", idle_since=None
)
def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]:
"""
Return instances in 'idle' state whose idle time exceeds their configured timeout.
idle_stop_config: {service_name: seconds} 0 means never stop automatically.
"""
now = time.time()
result = []
for inst in self._instances.values():
if inst.state != "idle" or inst.idle_since is None:
continue
timeout = idle_stop_config.get(inst.service, 0)
if timeout > 0 and (now - inst.idle_since) >= timeout:
result.append(inst)
return result

View file

@ -6,6 +6,7 @@ services:
vllm:
max_mb: 12288
priority: 1
idle_stop_after_s: 600
ollama:
max_mb: 12288
priority: 1
@ -14,6 +15,11 @@ services:
priority: 2
shared: true
max_concurrent: 4
cf-docuvision:
max_mb: 6144
priority: 2
shared: true
max_concurrent: 3
cf-stt:
max_mb: 1200
priority: 2

View file

@ -6,6 +6,7 @@ services:
vllm:
max_mb: 20480
priority: 1
idle_stop_after_s: 600
ollama:
max_mb: 18432
priority: 1
@ -14,6 +15,11 @@ services:
priority: 2
shared: true
max_concurrent: 6
cf-docuvision:
max_mb: 8192
priority: 2
shared: true
max_concurrent: 4
cf-stt:
max_mb: 1200
priority: 2

View file

@ -6,6 +6,17 @@ services:
vllm:
max_mb: 4096
priority: 1
idle_stop_after_s: 600
managed:
type: docker
image: "vllm/vllm-openai:v0.9.2"
port: 8000
host_port: 8000
command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8"
volumes:
- "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models"
runtime: nvidia
ipc: host
ollama:
max_mb: 3584
priority: 1
@ -14,6 +25,11 @@ services:
priority: 2
shared: true
max_concurrent: 2
cf-docuvision:
max_mb: 3072
priority: 2
shared: true
max_concurrent: 1
cf-stt:
max_mb: 600
priority: 2

View file

@ -6,6 +6,17 @@ services:
vllm:
max_mb: 5120
priority: 1
idle_stop_after_s: 600
managed:
type: docker
image: "vllm/vllm-openai:v0.9.2"
port: 8000
host_port: 8000
command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8"
volumes:
- "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models"
runtime: nvidia
ipc: host
ollama:
max_mb: 4096
priority: 1
@ -14,6 +25,11 @@ services:
priority: 2
shared: true
max_concurrent: 3
cf-docuvision:
max_mb: 4096
priority: 2
shared: true
max_concurrent: 2
cf-stt:
max_mb: 1200
priority: 2
@ -28,6 +44,13 @@ services:
comfyui:
max_mb: 6144
priority: 4
managed:
type: process
exec_path: "/opt/miniconda3/envs/comfyui/bin/python"
args_template: "/opt/ComfyUI/main.py --listen 0.0.0.0 --port {port} --cuda-device {gpu_id}"
cwd: "/opt/ComfyUI"
port: 8188
host_port: 8188
model_size_hints:
llm_max_params: 8b
image_gen_max: sdxl-fp8

View file

@ -5,22 +5,72 @@ from pathlib import Path
from typing import Any
import yaml
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
SUPPORTED_SCHEMA_VERSION = 1
class DockerSpec(BaseModel):
"""Spec for a Docker-managed service."""
image: str
port: int
host_port: int
command_template: str = ""
volumes: list[str] = Field(default_factory=list)
env: dict[str, str] = Field(default_factory=dict)
runtime: str = "nvidia"
ipc: str = "host"
model_config = {"frozen": True}
class ProcessSpec(BaseModel):
"""Spec for a process-managed service (non-Docker, e.g. conda env)."""
exec_path: str
args_template: str = ""
cwd: str = ""
env: dict[str, str] = Field(default_factory=dict)
port: int = 0
host_port: int = 0
model_config = {"frozen": True}
class ServiceProfile(BaseModel):
max_mb: int
priority: int
shared: bool = False
max_concurrent: int = 1
always_on: bool = False
idle_stop_after_s: int = 0
backend: str | None = None
consumers: list[str] = Field(default_factory=list)
managed: DockerSpec | ProcessSpec | None = None
model_config = {"frozen": True}
@model_validator(mode="before")
@classmethod
def _parse_managed(cls, values: Any) -> Any:
if not isinstance(values, dict):
return values
raw = values.get("managed")
if raw is None:
return values
if not isinstance(raw, dict):
return values
spec_type = raw.get("type")
managed_fields = {k: v for k, v in raw.items() if k != "type"}
if spec_type == "docker":
values["managed"] = DockerSpec(**managed_fields)
elif spec_type == "process":
values["managed"] = ProcessSpec(**managed_fields)
else:
raise ValueError(f"Unknown managed service type: {spec_type!r}")
return values
class GpuNodeEntry(BaseModel):
id: int

View file

@ -0,0 +1,93 @@
import asyncio
import time
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry, ServiceInstance
def test_build_idle_stop_config_empty_without_registry():
lm = LeaseManager()
supervisor = AgentSupervisor(lease_manager=lm)
assert supervisor._build_idle_stop_config() == {}
def test_build_idle_stop_config_from_profiles():
lm = LeaseManager()
mock_svc = MagicMock()
mock_svc.idle_stop_after_s = 600
mock_profile = MagicMock()
mock_profile.services = {"vllm": mock_svc}
mock_profile_registry = MagicMock()
mock_profile_registry.list_public.return_value = [mock_profile]
supervisor = AgentSupervisor(lease_manager=lm, profile_registry=mock_profile_registry)
config = supervisor._build_idle_stop_config()
assert config == {"vllm": 600}
@pytest.mark.asyncio
async def test_run_idle_sweep_posts_stop():
lm = LeaseManager()
service_registry = ServiceRegistry()
# Upsert instance as running, then allocate + release to transition it to idle
service_registry.upsert_instance(
service="vllm",
node_id="heimdall",
gpu_id=0,
state="running",
model="test-model",
url="http://heimdall:8000",
)
alloc = service_registry.allocate(
service="vllm",
node_id="heimdall",
gpu_id=0,
model="test-model",
url="http://heimdall:8000",
caller="test",
ttl_s=300.0,
)
service_registry.release(alloc.allocation_id)
# Backdate idle_since so it exceeds the timeout
import dataclasses
key = "vllm:heimdall:0"
inst = service_registry._instances[key]
service_registry._instances[key] = dataclasses.replace(inst, idle_since=time.time() - 700)
mock_profile_registry = MagicMock()
mock_svc = MagicMock()
mock_svc.idle_stop_after_s = 600
mock_profile = MagicMock()
mock_profile.services = {"vllm": mock_svc}
mock_profile_registry.list_public.return_value = [mock_profile]
supervisor = AgentSupervisor(
lease_manager=lm,
service_registry=service_registry,
profile_registry=mock_profile_registry,
)
supervisor.register("heimdall", "http://heimdall:7701")
posted_urls = []
async def fake_http_post(url: str) -> bool:
posted_urls.append(url)
return True
supervisor._http_post = fake_http_post
await supervisor._run_idle_sweep()
assert len(posted_urls) == 1
assert posted_urls[0] == "http://heimdall:7701/services/vllm/stop"
@pytest.mark.asyncio
async def test_run_idle_sweep_skips_without_registry():
lm = LeaseManager()
supervisor = AgentSupervisor(lease_manager=lm)
# Should return immediately without error
await supervisor._run_idle_sweep()

View file

@ -0,0 +1,94 @@
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import httpretty
from circuitforge_core.resources.client import CFOrchClient, Allocation
_ALLOC_BODY = (
'{"allocation_id":"abc123","service":"vllm","node_id":"heimdall",'
'"gpu_id":0,"model":"Ouro-1.4B","url":"http://heimdall:8000","started":false,"warm":true}'
)
@httpretty.activate
def test_sync_allocate_returns_allocation():
httpretty.register_uri(
httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
body=_ALLOC_BODY, content_type="application/json",
)
httpretty.register_uri(
httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/abc123",
body='{"released":true}', content_type="application/json",
)
client = CFOrchClient("http://orch:7700")
with client.allocate("vllm", model_candidates=["Ouro-1.4B"], caller="test") as alloc:
assert isinstance(alloc, Allocation)
assert alloc.url == "http://heimdall:8000"
assert alloc.model == "Ouro-1.4B"
assert alloc.allocation_id == "abc123"
assert httpretty.last_request().method == "DELETE"
@httpretty.activate
def test_sync_allocate_ignores_404_on_release():
httpretty.register_uri(
httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
body='{"allocation_id":"xyz","service":"vllm","node_id":"a","gpu_id":0,'
'"model":"m","url":"http://a:8000","started":false,"warm":false}',
content_type="application/json",
)
httpretty.register_uri(
httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/xyz",
status=404, body='{"detail":"not found"}', content_type="application/json",
)
client = CFOrchClient("http://orch:7700")
with client.allocate("vllm", model_candidates=["m"]) as alloc:
assert alloc.url == "http://a:8000"
# No exception raised — 404 on release is silently ignored
@httpretty.activate
def test_sync_allocate_raises_on_503():
httpretty.register_uri(
httpretty.POST, "http://orch:7700/api/services/vllm/allocate",
status=503, body='{"detail":"no capacity"}', content_type="application/json",
)
client = CFOrchClient("http://orch:7700")
with pytest.raises(RuntimeError, match="cf-orch allocation failed"):
with client.allocate("vllm", model_candidates=["m"]):
pass
async def test_async_allocate_works():
# httpretty only patches stdlib sockets; httpx async uses anyio sockets so
# we mock httpx.AsyncClient directly instead.
alloc_data = {
"allocation_id": "a1", "service": "vllm", "node_id": "n",
"gpu_id": 0, "model": "m", "url": "http://n:8000",
"started": False, "warm": False,
}
release_data = {"released": True}
def _make_response(data, status_code=200):
resp = MagicMock()
resp.is_success = status_code < 400
resp.status_code = status_code
resp.json.return_value = data
return resp
mock_post = AsyncMock(return_value=_make_response(alloc_data))
mock_delete = AsyncMock(return_value=_make_response(release_data))
mock_async_client = MagicMock()
mock_async_client.post = mock_post
mock_async_client.delete = mock_delete
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
mock_async_client.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=mock_async_client):
client = CFOrchClient("http://orch:7700")
async with client.allocate_async("vllm", model_candidates=["m"]) as alloc:
assert alloc.url == "http://n:8000"
assert alloc.allocation_id == "a1"
mock_delete.assert_called_once()

View file

@ -0,0 +1,132 @@
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi.testclient import TestClient
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord
from circuitforge_core.resources.models import GpuInfo, NodeInfo
def _make_supervisor_mock(online: bool = True):
sup = MagicMock()
record = AgentRecord(node_id="heimdall", agent_url="http://heimdall:7701")
record.gpus = [GpuInfo(0, "RTX 4000", 8192, 0, 8192)]
record.online = online
sup.online_agents.return_value = {"heimdall": record} if online else {}
sup.get_node_info.return_value = NodeInfo(
node_id="heimdall",
agent_url="http://heimdall:7701",
gpus=record.gpus,
last_heartbeat=0.0,
)
return sup
@pytest.fixture
def alloc_client():
lm = LeaseManager()
pr = ProfileRegistry()
sup = _make_supervisor_mock()
sr = ServiceRegistry()
app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup, service_registry=sr)
return TestClient(app), sup, sr
def test_allocate_returns_allocation_id_and_url(alloc_client):
client, sup, sr = alloc_client
with patch("httpx.AsyncClient") as mock_http:
mock_resp = MagicMock()
mock_resp.is_success = True
mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
resp = client.post("/api/services/vllm/allocate", json={
"model_candidates": ["Ouro-1.4B"],
"ttl_s": 300.0,
"caller": "test",
})
assert resp.status_code == 200
data = resp.json()
assert "allocation_id" in data
assert data["service"] == "vllm"
assert data["node_id"] == "heimdall"
assert data["url"] == "http://heimdall:8000"
def test_allocate_returns_503_when_no_online_nodes(alloc_client):
client, sup, sr = alloc_client
sup.online_agents.return_value = {}
resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]})
assert resp.status_code == 503
def test_allocate_returns_422_for_empty_candidates(alloc_client):
client, _, sr = alloc_client
resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []})
assert resp.status_code == 422
def test_allocate_returns_422_for_unknown_service(alloc_client):
client, _, sr = alloc_client
resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]})
assert resp.status_code == 422
def test_allocate_records_in_registry(alloc_client):
client, sup, sr = alloc_client
with patch("httpx.AsyncClient") as mock_http:
mock_resp = MagicMock()
mock_resp.is_success = True
mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
resp = client.post("/api/services/vllm/allocate", json={
"model_candidates": ["Ouro-1.4B"],
"ttl_s": 300.0,
"caller": "test",
})
assert resp.status_code == 200
allocation_id = resp.json()["allocation_id"]
status_resp = client.get("/api/services/vllm/status")
assert status_resp.status_code == 200
status_data = status_resp.json()
assert status_data["service"] == "vllm"
alloc_ids = [a["allocation_id"] for a in status_data["allocations"]]
assert allocation_id in alloc_ids
def test_release_allocation(alloc_client):
client, sup, sr = alloc_client
with patch("httpx.AsyncClient") as mock_http:
mock_resp = MagicMock()
mock_resp.is_success = True
mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"}
mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp)
resp = client.post("/api/services/vllm/allocate", json={
"model_candidates": ["Ouro-1.4B"],
"ttl_s": 300.0,
"caller": "test",
})
assert resp.status_code == 200
allocation_id = resp.json()["allocation_id"]
del_resp = client.delete(f"/api/services/vllm/allocations/{allocation_id}")
assert del_resp.status_code == 200
assert del_resp.json() == {"released": True, "allocation_id": allocation_id}
status_resp = client.get("/api/services/vllm/status")
alloc_ids = [a["allocation_id"] for a in status_resp.json()["allocations"]]
assert allocation_id not in alloc_ids
def test_release_allocation_not_found(alloc_client):
client, _, sr = alloc_client
resp = client.delete("/api/services/vllm/allocations/bad-id")
assert resp.status_code == 404

View file

@ -1,10 +1,14 @@
import pytest
from unittest.mock import MagicMock
from pathlib import Path
from fastapi.testclient import TestClient
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo
from circuitforge_core.resources.profiles.schema import load_profile
@pytest.fixture
@ -32,6 +36,7 @@ def coordinator_client():
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=supervisor,
service_registry=ServiceRegistry(),
)
return TestClient(app), lease_manager
@ -112,3 +117,67 @@ def test_dashboard_serves_html(coordinator_client):
assert "cf-orch" in resp.text
assert "/api/nodes" in resp.text
assert "/api/leases" in resp.text
def test_online_agents_excludes_offline():
lm = LeaseManager()
sup = AgentSupervisor(lm)
sup.register("online_node", "http://a:7701")
sup.register("offline_node", "http://b:7701")
sup._agents["online_node"].online = True
sup._agents["offline_node"].online = False
result = sup.online_agents()
assert "online_node" in result
assert "offline_node" not in result
def test_resident_keys_returns_set_of_node_service():
lm = LeaseManager()
lm.set_residents_for_node("heimdall", [("vllm", "Ouro-1.4B"), ("ollama", None)])
keys = lm.resident_keys()
assert keys == {"heimdall:vllm", "heimdall:ollama"}
def test_single_gpu_8gb_profile_has_idle_stop_after_s():
profile = load_profile(
Path("circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml")
)
vllm_svc = profile.services.get("vllm")
assert vllm_svc is not None
assert hasattr(vllm_svc, "idle_stop_after_s")
assert vllm_svc.idle_stop_after_s == 600
def test_ensure_service_returns_503_when_vram_too_low():
"""VRAM pre-flight guard fires before any HTTP request when free VRAM < max_mb // 2."""
# vllm max_mb = 5120 → threshold = 2560 MB; 100 MB free triggers 503.
lease_manager = LeaseManager()
lease_manager.register_gpu("low-vram-node", 0, 512)
profile_registry = ProfileRegistry()
supervisor = MagicMock()
supervisor.get_node_info.return_value = NodeInfo(
node_id="low-vram-node",
agent_url="http://localhost:7701",
gpus=[GpuInfo(gpu_id=0, name="GTX 1050",
vram_total_mb=512, vram_used_mb=412, vram_free_mb=100)],
last_heartbeat=0.0,
)
supervisor.all_nodes.return_value = []
app = create_coordinator_app(
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=supervisor,
service_registry=ServiceRegistry(),
)
client = TestClient(app)
resp = client.post("/api/services/vllm/ensure", json={
"node_id": "low-vram-node",
"gpu_id": 0,
"params": {"model": "some-model"},
})
assert resp.status_code == 503
assert "Insufficient VRAM" in resp.json()["detail"]
# Guard must fire before any agent HTTP call is attempted.
supervisor.get_node_info.assert_called_once_with("low-vram-node")

View file

@ -11,6 +11,7 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo
@ -47,6 +48,7 @@ def system():
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=mock_supervisor,
service_registry=ServiceRegistry(),
)
client = TestClient(app)
return client, lease_manager

View file

@ -0,0 +1,56 @@
import pytest
from circuitforge_core.resources.coordinator.node_selector import select_node
from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord
from circuitforge_core.resources.models import GpuInfo
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
def _make_agent(node_id: str, free_mb: int, online: bool = True) -> AgentRecord:
r = AgentRecord(node_id=node_id, agent_url=f"http://{node_id}:7701")
r.gpus = [GpuInfo(gpu_id=0, name="RTX", vram_total_mb=8192,
vram_used_mb=8192 - free_mb, vram_free_mb=free_mb)]
r.online = online
return r
def test_selects_node_with_most_free_vram():
agents = {
"a": _make_agent("a", free_mb=2000),
"b": _make_agent("b", free_mb=6000),
}
registry = ProfileRegistry()
result = select_node(agents, "vllm", registry, resident_keys=set())
assert result == ("b", 0)
def test_prefers_warm_node_even_with_less_free_vram():
agents = {
"a": _make_agent("a", free_mb=2000),
"b": _make_agent("b", free_mb=6000),
}
registry = ProfileRegistry()
result = select_node(agents, "vllm", registry, resident_keys={"a:vllm"})
assert result == ("a", 0)
def test_excludes_offline_nodes():
agents = {
"a": _make_agent("a", free_mb=8000, online=False),
"b": _make_agent("b", free_mb=2000, online=True),
}
registry = ProfileRegistry()
result = select_node(agents, "vllm", registry, resident_keys=set())
assert result == ("b", 0)
def test_returns_none_when_no_node_has_profile_for_service():
agents = {"a": _make_agent("a", free_mb=8000)}
registry = ProfileRegistry()
result = select_node(agents, "cf-nonexistent-service", registry, resident_keys=set())
assert result is None
def test_returns_none_when_no_agents():
registry = ProfileRegistry()
result = select_node({}, "vllm", registry, resident_keys=set())
assert result is None

View file

@ -0,0 +1,86 @@
import time
import dataclasses
import pytest
from circuitforge_core.resources.coordinator.service_registry import (
ServiceRegistry, ServiceAllocation, ServiceInstance,
)
@pytest.fixture
def registry():
return ServiceRegistry()
def test_allocate_creates_allocation(registry):
alloc = registry.allocate(
service="vllm", node_id="heimdall", gpu_id=0,
model="Ouro-1.4B", url="http://heimdall:8000",
caller="test", ttl_s=300.0,
)
assert alloc.service == "vllm"
assert alloc.node_id == "heimdall"
assert alloc.allocation_id # non-empty UUID string
def test_active_allocations_count(registry):
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0)
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0)
assert registry.active_allocations("vllm", "heimdall", 0) == 2
def test_release_decrements_count(registry):
alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0)
registry.release(alloc.allocation_id)
assert registry.active_allocations("vllm", "heimdall", 0) == 0
def test_release_nonexistent_returns_false(registry):
assert registry.release("nonexistent-id") is False
def test_upsert_instance_sets_running_state(registry):
registry.upsert_instance("vllm", "heimdall", 0, state="running",
model="Ouro-1.4B", url="http://heimdall:8000")
instances = registry.all_instances()
assert len(instances) == 1
assert instances[0].state == "running"
def test_release_last_alloc_marks_instance_idle(registry):
registry.upsert_instance("vllm", "heimdall", 0, state="running",
model="Ouro-1.4B", url="http://heimdall:8000")
alloc = registry.allocate("vllm", "heimdall", 0, "Ouro-1.4B", "http://heimdall:8000", "a", 300.0)
registry.release(alloc.allocation_id)
instance = registry.all_instances()[0]
assert instance.state == "idle"
assert instance.idle_since is not None
def test_new_alloc_on_idle_instance_marks_it_running(registry):
registry.upsert_instance("vllm", "heimdall", 0, state="idle",
model="M", url="http://h:8000")
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0)
assert registry.all_instances()[0].state == "running"
def test_sweep_expired_allocations(registry):
# Register a running instance so idle-transition logic has something to act on.
registry.upsert_instance("vllm", "heimdall", 0, state="running",
model="M", url="http://h:8000")
# Create an allocation with a very short TTL (1 second).
alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "caller", ttl_s=1)
assert registry.active_allocations("vllm", "heimdall", 0) == 1
# Wait for TTL to elapse.
time.sleep(1.1)
expired = registry.sweep_expired_allocations()
# The allocation should have been swept.
assert alloc.allocation_id in expired
assert registry.active_allocations("vllm", "heimdall", 0) == 0
# The instance should have transitioned to idle since no allocations remain.
instance = registry.all_instances()[0]
assert instance.state == "idle"
assert instance.idle_since is not None