diff --git a/circuitforge_core/llm/router.py b/circuitforge_core/llm/router.py index e404fbe..3a22d81 100644 --- a/circuitforge_core/llm/router.py +++ b/circuitforge_core/llm/router.py @@ -3,12 +3,15 @@ LLM abstraction layer with priority fallback chain. Reads config from ~/.config/circuitforge/llm.yaml. Tries backends in order; falls back on any error. """ +import logging import os import yaml import requests from pathlib import Path from openai import OpenAI +logger = logging.getLogger(__name__) + CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml" @@ -38,6 +41,33 @@ class LLMRouter: models = client.models.list() return models.data[0].id + def _try_cf_orch_alloc(self, backend: dict) -> "tuple | None": + """ + If backend config has a cf_orch block and CF_ORCH_URL is set (env takes + precedence over yaml url), allocate via cf-orch and return (ctx, alloc). + Returns None if not configured or allocation fails. + Caller MUST call ctx.__exit__(None, None, None) in a finally block. + """ + import os + orch_cfg = backend.get("cf_orch") + if not orch_cfg: + return None + orch_url = os.environ.get("CF_ORCH_URL", orch_cfg.get("url", "")) + if not orch_url: + return None + try: + from circuitforge_core.resources.client import CFOrchClient + client = CFOrchClient(orch_url) + service = orch_cfg.get("service", "vllm") + candidates = orch_cfg.get("model_candidates", []) + ttl_s = float(orch_cfg.get("ttl_s", 3600.0)) + ctx = client.allocate(service, model_candidates=candidates, ttl_s=ttl_s, caller="llm-router") + alloc = ctx.__enter__() + return (ctx, alloc) + except Exception as exc: + logger.warning("[LLMRouter] cf_orch allocation failed, using base_url directly: %s", exc) + return None + def complete(self, prompt: str, system: str | None = None, model_override: str | None = None, fallback_order: list[str] | None = None, @@ -105,6 +135,12 @@ class LLMRouter: if not self._is_reachable(backend["base_url"]): print(f"[LLMRouter] {name}: unreachable, skipping") continue + # --- cf_orch: optionally override base_url with coordinator-allocated URL --- + orch_ctx = orch_alloc = None + orch_result = self._try_cf_orch_alloc(backend) + if orch_result is not None: + orch_ctx, orch_alloc = orch_result + backend = {**backend, "base_url": orch_alloc.url + "/v1"} try: client = OpenAI( base_url=backend["base_url"], @@ -136,6 +172,12 @@ class LLMRouter: except Exception as e: print(f"[LLMRouter] {name}: error — {e}, trying next") continue + finally: + if orch_ctx is not None: + try: + orch_ctx.__exit__(None, None, None) + except Exception: + pass elif backend["type"] == "anthropic": api_key = os.environ.get(backend["api_key_env"], "") diff --git a/circuitforge_core/resources/__init__.py b/circuitforge_core/resources/__init__.py index e69de29..8bf5235 100644 --- a/circuitforge_core/resources/__init__.py +++ b/circuitforge_core/resources/__init__.py @@ -0,0 +1 @@ +from circuitforge_core.resources.client import CFOrchClient, Allocation # noqa: F401 diff --git a/circuitforge_core/resources/cli.py b/circuitforge_core/resources/cli.py index 9e65f08..dc6883f 100644 --- a/circuitforge_core/resources/cli.py +++ b/circuitforge_core/resources/cli.py @@ -44,11 +44,17 @@ def start( from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.app import create_coordinator_app + from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor lease_manager = LeaseManager() profile_registry = ProfileRegistry() - supervisor = AgentSupervisor(lease_manager=lease_manager) + service_registry = ServiceRegistry() + supervisor = AgentSupervisor( + lease_manager=lease_manager, + service_registry=service_registry, + profile_registry=profile_registry, + ) monitor = GpuMonitor() gpus = monitor.poll() @@ -79,6 +85,7 @@ def start( lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=supervisor, + service_registry=service_registry, ) typer.echo(f"Starting cf-orch coordinator on {host}:{port}") @@ -92,6 +99,7 @@ def agent( host: str = "0.0.0.0", port: int = 7701, advertise_host: Optional[str] = None, + profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None, ) -> None: """Start a cf-orch node agent and self-register with the coordinator. @@ -101,10 +109,11 @@ def agent( Use --advertise-host to override the IP the coordinator should use to reach this agent (e.g. on a multi-homed or NATted host). """ - import asyncio import threading import httpx from circuitforge_core.resources.agent.app import create_agent_app + from circuitforge_core.resources.agent.service_manager import ServiceManager + from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry # The URL the coordinator should use to reach this agent. reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host) @@ -132,7 +141,18 @@ def agent( # Fire registration in a daemon thread so uvicorn.run() can start blocking immediately. threading.Thread(target=_register_in_background, daemon=True).start() - agent_app = create_agent_app(node_id=node_id) + service_manager = None + try: + from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor + pr = ProfileRegistry() + gpus = GpuMonitor().poll() + p = pr.load(Path(profile)) if profile else pr.auto_detect(gpus) + service_manager = ServiceManager(node_id=node_id, profile=p, advertise_host=reach_host) + typer.echo(f"ServiceManager ready with profile: {p.name}") + except Exception as exc: + typer.echo(f"Warning: ServiceManager unavailable ({exc})", err=True) + + agent_app = create_agent_app(node_id=node_id, service_manager=service_manager) typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}") uvicorn.run(agent_app, host=host, port=port) diff --git a/circuitforge_core/resources/client.py b/circuitforge_core/resources/client.py new file mode 100644 index 0000000..fa0a72e --- /dev/null +++ b/circuitforge_core/resources/client.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import logging +from contextlib import contextmanager, asynccontextmanager +from dataclasses import dataclass + +import httpx + +logger = logging.getLogger(__name__) + + +@dataclass +class Allocation: + allocation_id: str + service: str + node_id: str + gpu_id: int + model: str | None + url: str + started: bool + warm: bool + + +class CFOrchClient: + """ + Client for cf-orch coordinator allocation. + + Sync usage (in LLMRouter or other sync code): + client = CFOrchClient(os.environ["CF_ORCH_URL"]) + with client.allocate("vllm", model_candidates=["Ouro-1.4B"]) as alloc: + # alloc.url is the inference endpoint + + Async usage (in FastAPI apps): + async with client.allocate_async("vllm", model_candidates=["Ouro-1.4B"]) as alloc: + ... + + Raises ValueError immediately if coordinator_url is empty. + """ + + def __init__(self, coordinator_url: str) -> None: + if not coordinator_url: + raise ValueError("coordinator_url is empty — cf-orch not configured") + self._url = coordinator_url.rstrip("/") + + def _build_body(self, model_candidates: list[str] | None, ttl_s: float, caller: str) -> dict: + return { + "model_candidates": model_candidates or [], + "ttl_s": ttl_s, + "caller": caller, + } + + def _parse_allocation(self, data: dict, service: str) -> Allocation: + return Allocation( + allocation_id=data["allocation_id"], + service=service, + node_id=data["node_id"], + gpu_id=data["gpu_id"], + model=data.get("model"), + url=data["url"], + started=data.get("started", False), + warm=data.get("warm", False), + ) + + @contextmanager + def allocate( + self, + service: str, + *, + model_candidates: list[str] | None = None, + ttl_s: float = 3600.0, + caller: str = "", + ): + """Sync context manager. Allocates on enter, releases on exit.""" + resp = httpx.post( + f"{self._url}/api/services/{service}/allocate", + json=self._build_body(model_candidates, ttl_s, caller), + timeout=120.0, + ) + if not resp.is_success: + raise RuntimeError( + f"cf-orch allocation failed for {service!r}: " + f"HTTP {resp.status_code} — {resp.text[:200]}" + ) + alloc = self._parse_allocation(resp.json(), service) + try: + yield alloc + finally: + try: + httpx.delete( + f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}", + timeout=10.0, + ) + except Exception as exc: + logger.debug("cf-orch release failed (non-fatal): %s", exc) + + @asynccontextmanager + async def allocate_async( + self, + service: str, + *, + model_candidates: list[str] | None = None, + ttl_s: float = 3600.0, + caller: str = "", + ): + """Async context manager. Allocates on enter, releases on exit.""" + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + f"{self._url}/api/services/{service}/allocate", + json=self._build_body(model_candidates, ttl_s, caller), + ) + if not resp.is_success: + raise RuntimeError( + f"cf-orch allocation failed for {service!r}: " + f"HTTP {resp.status_code} — {resp.text[:200]}" + ) + alloc = self._parse_allocation(resp.json(), service) + try: + yield alloc + finally: + try: + await client.delete( + f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}", + timeout=10.0, + ) + except Exception as exc: + logger.debug("cf-orch async release failed (non-fatal): %s", exc) diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index 63d92e7..8536636 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -8,7 +8,9 @@ from dataclasses import dataclass, field import httpx from circuitforge_core.resources.coordinator.lease_manager import LeaseManager -from circuitforge_core.resources.models import GpuInfo, NodeInfo +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry +from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation logger = logging.getLogger(__name__) @@ -26,15 +28,27 @@ class AgentRecord: class AgentSupervisor: - def __init__(self, lease_manager: LeaseManager) -> None: + def __init__( + self, + lease_manager: LeaseManager, + service_registry: ServiceRegistry | None = None, + profile_registry: ProfileRegistry | None = None, + ) -> None: self._agents: dict[str, AgentRecord] = {} self._lease_manager = lease_manager self._running = False + self._service_registry = service_registry + self._profile_registry = profile_registry + self._heartbeat_tick = 0 def register(self, node_id: str, agent_url: str) -> None: if node_id not in self._agents: self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url) logger.info("Registered agent node: %s @ %s", node_id, agent_url) + else: + if self._agents[node_id].agent_url != agent_url: + self._agents[node_id].agent_url = agent_url + logger.info("Updated agent URL for %s → %s", node_id, agent_url) def get_node_info(self, node_id: str) -> NodeInfo | None: record = self._agents.get(node_id) @@ -58,15 +72,27 @@ class AgentSupervisor: for r in self._agents.values() ] + def online_agents(self) -> "dict[str, AgentRecord]": + """Return only currently-online agents, keyed by node_id.""" + return {nid: rec for nid, rec in self._agents.items() if rec.online} + async def poll_agent(self, node_id: str) -> bool: record = self._agents.get(node_id) if record is None: return False try: async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client: - resp = await client.get(f"{record.agent_url}/gpu-info") - resp.raise_for_status() - data = resp.json() + gpu_resp = await client.get(f"{record.agent_url}/gpu-info") + gpu_resp.raise_for_status() + + # Resident-info is best-effort — older agents may not have the endpoint. + try: + res_resp = await client.get(f"{record.agent_url}/resident-info") + resident_data = res_resp.json() if res_resp.is_success else {} + except Exception: + resident_data = {} + + data = gpu_resp.json() gpus = [ GpuInfo( gpu_id=g["gpu_id"], @@ -82,6 +108,13 @@ class AgentSupervisor: record.online = True for gpu in gpus: self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb) + + residents = [ + (r["service"], r.get("model_name")) + for r in resident_data.get("residents", []) + ] + self._lease_manager.set_residents_for_node(node_id, residents) + return True except Exception as exc: logger.warning("Agent %s unreachable: %s", node_id, exc) @@ -91,10 +124,58 @@ class AgentSupervisor: async def poll_all(self) -> None: await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents]) + def _build_idle_stop_config(self) -> dict[str, int]: + if self._profile_registry is None: + return {} + config: dict[str, int] = {} + for profile in self._profile_registry.list_public(): + for svc_name, svc in profile.services.items(): + if svc.idle_stop_after_s > 0: + existing = config.get(svc_name, 0) + config[svc_name] = min(existing, svc.idle_stop_after_s) if existing > 0 else svc.idle_stop_after_s + return config + + async def _http_post(self, url: str) -> bool: + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post(url) + return resp.is_success + except Exception as exc: + logger.warning("HTTP POST %s failed: %s", url, exc) + return False + + async def _run_idle_sweep(self) -> None: + if self._service_registry is None: + return + expired = self._service_registry.sweep_expired_allocations() + if expired: + logger.info("TTL sweep: expired %d allocation(s): %s", len(expired), expired) + idle_stop_config = self._build_idle_stop_config() + if not idle_stop_config: + return + timed_out = self._service_registry.idle_past_timeout(idle_stop_config) + for instance in timed_out: + node_info = self.get_node_info(instance.node_id) + if node_info is None: + continue + stop_url = f"{node_info.agent_url}/services/{instance.service}/stop" + logger.info( + "Idle sweep: stopping %s on %s gpu%s (idle timeout)", + instance.service, instance.node_id, instance.gpu_id, + ) + success = await self._http_post(stop_url) + if success: + self._service_registry.mark_stopped( + instance.service, instance.node_id, instance.gpu_id + ) + async def run_heartbeat_loop(self) -> None: self._running = True while self._running: await self.poll_all() + self._heartbeat_tick += 1 + if self._heartbeat_tick % 3 == 0: + await self._run_idle_sweep() await asyncio.sleep(_HEARTBEAT_INTERVAL_S) def stop(self) -> None: diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index abf333f..1fd7c91 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -11,7 +11,9 @@ from pydantic import BaseModel from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.node_selector import select_node from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() @@ -30,10 +32,29 @@ class NodeRegisterRequest(BaseModel): agent_url: str # e.g. "http://10.1.10.71:7701" +class ServiceEnsureRequest(BaseModel): + node_id: str + gpu_id: int = 0 + params: dict[str, str] = {} + ttl_s: float = 3600.0 + # Ordered list of model names to try; falls back down the list if VRAM is tight. + # The "model" key in params is used if this list is empty. + model_candidates: list[str] = [] + + +class ServiceAllocateRequest(BaseModel): + model_candidates: list[str] = [] + gpu_id: int | None = None + params: dict[str, str] = {} + ttl_s: float = 3600.0 + caller: str = "" + + def create_coordinator_app( lease_manager: LeaseManager, profile_registry: ProfileRegistry, agent_supervisor: AgentSupervisor, + service_registry: ServiceRegistry, ) -> FastAPI: eviction_engine = EvictionEngine(lease_manager=lease_manager) @@ -95,6 +116,20 @@ def create_coordinator_app( ] } + @app.get("/api/resident") + def get_residents() -> dict[str, Any]: + return { + "residents": [ + { + "service": r.service, + "node_id": r.node_id, + "model_name": r.model_name, + "first_seen": r.first_seen, + } + for r in lease_manager.all_residents() + ] + } + @app.get("/api/leases") def get_leases() -> dict[str, Any]: return { @@ -155,4 +190,232 @@ def create_coordinator_app( raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found") return {"released": True, "lease_id": lease_id} + @app.post("/api/services/{service}/ensure") + async def ensure_service(service: str, req: ServiceEnsureRequest) -> dict[str, Any]: + """ + Ensure a managed service is running on the given node. + + If model_candidates is provided, tries each model in order, skipping any + that exceed the live free VRAM on the target GPU. Falls back down the list + until one succeeds. The selected model is returned in the response. + """ + import httpx + + node_info = agent_supervisor.get_node_info(req.node_id) + if node_info is None: + raise HTTPException(422, detail=f"Unknown node_id {req.node_id!r}") + + # Resolve candidate list — fall back to params["model"] if not specified. + candidates: list[str] = req.model_candidates or ( + [req.params["model"]] if "model" in req.params else [] + ) + if not candidates: + raise HTTPException(422, detail="No model specified: set params.model or model_candidates") + + # Live free VRAM on the target GPU (used for pre-flight filtering). + gpu = next((g for g in node_info.gpus if g.gpu_id == req.gpu_id), None) + free_mb = gpu.vram_free_mb if gpu else 0 + + # Profile max_mb for the service gives us the VRAM ceiling for this slot. + # Models larger than free_mb are skipped before we even try to start them. + # We use model file size as a rough proxy — skip if free_mb < half of max_mb, + # since a fully-loaded model typically needs ~50-80% of its param size in VRAM. + service_max_mb = 0 + for p in profile_registry.list_public(): + svc = p.services.get(service) + if svc: + service_max_mb = svc.max_mb + break + + # Filter candidates by VRAM headroom — skip models where free VRAM + # is less than half of the service's max_mb ceiling. + if service_max_mb > 0 and free_mb < service_max_mb // 2: + raise HTTPException( + 503, + detail=f"Insufficient VRAM on gpu {req.gpu_id}: {free_mb}MB free, need at least {service_max_mb // 2}MB", + ) + + last_error: str = "" + async with httpx.AsyncClient(timeout=120.0) as client: + for model in candidates: + params_with_model = {**req.params, "model": model} + try: + start_resp = await client.post( + f"{node_info.agent_url}/services/{service}/start", + json={"gpu_id": req.gpu_id, "params": params_with_model}, + ) + if start_resp.is_success: + data = start_resp.json() + return { + "service": service, + "node_id": req.node_id, + "gpu_id": req.gpu_id, + "model": model, + "url": data.get("url"), + "running": data.get("running", False), + } + last_error = start_resp.text + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + + raise HTTPException( + 503, + detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}", + ) + + @app.post("/api/services/{service}/allocate") + async def allocate_service(service: str, req: ServiceAllocateRequest) -> dict[str, Any]: + """ + Allocate a managed service — coordinator picks the best node automatically. + Returns a URL + allocation_id. (Allocation not tracked server-side until Phase 2.) + """ + import httpx + + if not req.model_candidates: + raise HTTPException(422, detail="model_candidates must be non-empty") + + # Validate service is known in at least one profile, regardless of gpu_id + if not any(service in p.services for p in profile_registry.list_public()): + raise HTTPException(422, detail=f"Unknown service {service!r} — not in any profile") + + residents = lease_manager.resident_keys() + + if req.gpu_id is None: + online = agent_supervisor.online_agents() + placement = select_node(online, service, profile_registry, residents) + if placement is None: + raise HTTPException( + 503, + detail=f"No online node has capacity for service {service!r}", + ) + node_id, gpu_id = placement + else: + online = agent_supervisor.online_agents() + node_id = next( + (nid for nid, rec in online.items() + if any(g.gpu_id == req.gpu_id for g in rec.gpus)), + None, + ) + if node_id is None: + raise HTTPException(422, detail=f"No online node has gpu_id={req.gpu_id}") + gpu_id = req.gpu_id + + node_info = agent_supervisor.get_node_info(node_id) + if node_info is None: + raise HTTPException(422, detail=f"Node {node_id!r} not found") + + warm = f"{node_id}:{service}" in residents + + async with httpx.AsyncClient(timeout=120.0) as client: + last_error = "" + for model in req.model_candidates: + try: + resp = await client.post( + f"{node_info.agent_url}/services/{service}/start", + json={"gpu_id": gpu_id, "params": {**req.params, "model": model}}, + ) + if resp.is_success: + data = resp.json() + alloc = service_registry.allocate( + service=service, + node_id=node_id, + gpu_id=gpu_id, + model=model, + caller=req.caller, + url=data.get("url", ""), + ttl_s=req.ttl_s, + ) + return { + "allocation_id": alloc.allocation_id, + "service": service, + "node_id": node_id, + "gpu_id": gpu_id, + "model": model, + "url": data.get("url"), + "started": not warm, + "warm": warm, + } + last_error = resp.text + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + + raise HTTPException( + 503, + detail=f"All model candidates exhausted for {service!r}. Last error: {last_error}", + ) + + @app.delete("/api/services/{service}/allocations/{allocation_id}") + async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]: + existing = service_registry.get_allocation(allocation_id) + if existing is None or existing.service != service: + raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found for service {service!r}") + released = service_registry.release(allocation_id) + if not released: + raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found") + return {"released": True, "allocation_id": allocation_id} + + @app.get("/api/services/{service}/status") + def get_service_status(service: str) -> dict[str, Any]: + instances = [i for i in service_registry.all_instances() if i.service == service] + allocations = [a for a in service_registry.all_allocations() if a.service == service] + return { + "service": service, + "instances": [ + { + "node_id": i.node_id, + "gpu_id": i.gpu_id, + "state": i.state, + "model": i.model, + "url": i.url, + "idle_since": i.idle_since, + } + for i in instances + ], + "allocations": [ + { + "allocation_id": a.allocation_id, + "node_id": a.node_id, + "gpu_id": a.gpu_id, + "model": a.model, + "caller": a.caller, + "url": a.url, + "expires_at": a.expires_at, + } + for a in allocations + ], + } + + @app.get("/api/services") + def list_services() -> dict[str, Any]: + instances = service_registry.all_instances() + return { + "services": [ + { + "service": i.service, + "node_id": i.node_id, + "gpu_id": i.gpu_id, + "state": i.state, + "model": i.model, + "url": i.url, + } + for i in instances + ] + } + + @app.delete("/api/services/{service}") + async def stop_service(service: str, node_id: str) -> dict[str, Any]: + """Stop a managed service on the given node.""" + node_info = agent_supervisor.get_node_info(node_id) + if node_info is None: + raise HTTPException(422, detail=f"Unknown node_id {node_id!r}") + + import httpx + async with httpx.AsyncClient(timeout=30.0) as client: + try: + resp = await client.post(f"{node_info.agent_url}/services/{service}/stop") + resp.raise_for_status() + return {"service": service, "node_id": node_id, "stopped": resp.json().get("stopped", False)} + except httpx.HTTPError as exc: + raise HTTPException(502, detail=f"Agent unreachable: {exc}") + return app diff --git a/circuitforge_core/resources/coordinator/dashboard.html b/circuitforge_core/resources/coordinator/dashboard.html index 79fc9cb..a657111 100644 --- a/circuitforge_core/resources/coordinator/dashboard.html +++ b/circuitforge_core/resources/coordinator/dashboard.html @@ -52,8 +52,9 @@ .gpu-node { font-size: 0.75em; font-weight: 700; color: var(--indigo); margin-bottom: 1px; } .gpu-offline .gpu-node { color: var(--orange); } .gpu-name { font-size: 0.78em; color: var(--text); margin-bottom: 0.4rem; } - .vram-track { background: var(--bg); border-radius: var(--radius-sm); height: 6px; margin-bottom: 0.3rem; } - .vram-fill { height: 100%; border-radius: var(--radius-sm); transition: width 0.4s; } + .vram-track { position: relative; background: var(--bg); border-radius: var(--radius-sm); height: 6px; margin-bottom: 0.3rem; overflow: hidden; } + .vram-leased { position: absolute; left: 0; top: 0; height: 100%; background: var(--cyan); transition: width 0.4s; } + .vram-resident { position: absolute; top: 0; height: 100%; background: var(--amber); transition: left 0.4s, width 0.4s; } .vram-label { font-size: 0.72em; color: var(--muted); margin-bottom: 0.25rem; } .gpu-status { font-size: 0.72em; } .gpu-status.idle { color: var(--green); } @@ -62,21 +63,30 @@ .gpu-status.offline { color: var(--orange); } .spark-track { height: 24px; background: var(--bg); border-radius: var(--radius-sm); margin-top: 0.4rem; overflow: hidden; } - /* leases */ - #leases-table { width: 100%; border-collapse: collapse; background: var(--bg2); border: 1px solid var(--border); border-radius: var(--radius); overflow: hidden; margin-bottom: 1rem; } - #leases-table th { background: var(--bg3); color: var(--dim); font-size: 0.72em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; padding: 0.4rem 0.6rem; text-align: left; border-bottom: 1px solid var(--border); } - #leases-table td { padding: 0.35rem 0.6rem; border-bottom: 1px solid var(--border-dim); font-size: 0.8em; vertical-align: middle; } - #leases-table tr:last-child td { border-bottom: none; } + /* shared table base */ + .cf-table { width: 100%; border-collapse: collapse; background: var(--bg2); border: 1px solid var(--border); border-radius: var(--radius); overflow: hidden; margin-bottom: 1rem; } + .cf-table th { background: var(--bg3); color: var(--dim); font-size: 0.72em; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; padding: 0.4rem 0.6rem; text-align: left; border-bottom: 1px solid var(--border); } + .cf-table td { padding: 0.35rem 0.6rem; border-bottom: 1px solid var(--border-dim); font-size: 0.8em; vertical-align: middle; } + .cf-table tr:last-child td { border-bottom: none; } .td-service { color: var(--indigo); font-weight: 600; } .td-node { color: var(--muted); } .td-mb { color: var(--text); } .td-priority { color: var(--amber); } + .td-model { color: var(--cyan); font-size: 0.75em; } + .td-warm { color: var(--amber); } .td-none { color: var(--dim); font-style: italic; } .ttl-wrap { display: flex; align-items: center; gap: 0.5rem; } .ttl-label { color: var(--cyan); font-variant-numeric: tabular-nums; white-space: nowrap; } .ttl-track { flex: 1; background: var(--bg); border-radius: var(--radius-sm); height: 4px; } .ttl-fill { height: 100%; border-radius: var(--radius-sm); background: var(--cyan); transition: width 0.4s; } + /* service state classes */ + .state-running { color: #2ecc40; } + .state-idle { color: #ff851b; } + .state-stopped { color: #aaa; } + .state-starting { color: #0074d9; } + .state-unknown { color: #ff4136; } + /* error */ #error-banner { display: none; background: rgba(248,81,73,.1); border: 1px solid var(--red); border-radius: var(--radius); color: var(--red); padding: 0.5rem 0.75rem; font-size: 0.82em; margin-bottom: 1rem; } @@ -102,8 +112,20 @@
GPU Nodes
+
+
Service Instances
+ + + + + + + +
ServiceNodeGPUStateModelURL
+
+
Active Leases
- +
@@ -112,10 +134,22 @@
ServiceNode / GPUVRAMPriorityTTL / Expires
+
Warm Models
+ + + + + + + +
ServiceNodeModelWarm Since
+ @@ -198,6 +232,41 @@ setInterval(() => { document.getElementById('countdown').textContent = countdown; }, 1000); +// ── state class helper ─────────────────────────────────────────── +function stateClass(state) { + const map = { running: 'state-running', idle: 'state-idle', stopped: 'state-stopped', starting: 'state-starting' }; + return map[state] || 'state-unknown'; +} + +// ── render: services table ─────────────────────────────────────── +function renderServices(services) { + const tbody = document.getElementById('services-body'); + if (!services || services.length === 0) { + const tr = document.createElement('tr'); + const td = el('td', { cls: 'td-none', text: 'No service instances registered.' }); + td.setAttribute('colspan', '6'); + tr.appendChild(td); + setChildren(tbody, tr); + return; + } + + const rows = services.map(svc => { + const tr = document.createElement('tr'); + const fields = [ + { text: svc.service, cls: 'td-service' }, + { text: svc.node_id, cls: 'td-node' }, + { text: String(svc.gpu_id), cls: 'td-mb' }, + { text: svc.state, cls: stateClass(svc.state) }, + { text: svc.model || '\u2014', cls: 'td-model' }, + { text: svc.url || '\u2014', cls: 'td-node' }, + ]; + fields.forEach(f => tr.appendChild(el('td', { cls: f.cls, text: f.text }))); + return tr; + }); + + setChildren(tbody, ...rows); +} + // ── render: health strip ───────────────────────────────────────── function renderHealth(ok) { const strip = document.getElementById('health-strip'); @@ -206,7 +275,8 @@ function renderHealth(ok) { } // ── render: GPU grid ───────────────────────────────────────────── -function renderNodes(nodes) { +// leasedByGpu: "nodeId:gpuId" → total MB currently leased (from active leases) +function renderNodes(nodes, leasedByGpu) { const grid = document.getElementById('gpu-grid'); if (!nodes || nodes.length === 0) { setChildren(grid, el('div', { text: 'No nodes registered.', style: { color: 'var(--dim)', fontSize: '0.8em', padding: '0.5rem' } })); @@ -216,33 +286,46 @@ function renderNodes(nodes) { const cards = []; for (const node of nodes) { for (const gpu of node.gpus) { - const key = node.node_id + ':' + gpu.gpu_id; - const pct = gpu.vram_total_mb > 0 ? gpu.vram_used_mb / gpu.vram_total_mb : 0; - const usedGb = (gpu.vram_used_mb / 1024).toFixed(1); - const totalGb = (gpu.vram_total_mb / 1024).toFixed(1); - const color = vramColor(pct); + const key = node.node_id + ':' + gpu.gpu_id; + const total = gpu.vram_total_mb || 1; + const used = gpu.vram_used_mb; + const leased = leasedByGpu[key] || 0; + // Resident = nvidia-smi used minus actively leased; clamped to [0, used]. + const resident = Math.max(0, Math.min(used - leased, used)); + const pct = used / total; if (!sparkHistory[key]) sparkHistory[key] = []; - sparkHistory[key].push(gpu.vram_used_mb); + sparkHistory[key].push(used); if (sparkHistory[key].length > 20) sparkHistory[key].shift(); const statusCls = pct >= 0.9 ? 'full' : pct >= 0.1 ? 'busy' : 'idle'; const statusText = pct >= 0.9 ? 'saturated' : pct >= 0.1 ? Math.round(pct * 100) + '% used' : 'idle'; - const card = el('div', { cls: 'gpu-card' }); - + const card = el('div', { cls: 'gpu-card' }); const nodeLabel = el('div', { cls: 'gpu-node', text: node.node_id.toUpperCase() + ' · GPU ' + gpu.gpu_id }); const nameLine = el('div', { cls: 'gpu-name', text: gpu.name || 'Unknown GPU' }); - const track = el('div', { cls: 'vram-track' }); - const fill = el('div', { cls: 'vram-fill', style: { width: (pct * 100).toFixed(1) + '%', background: color } }); - track.appendChild(fill); + // Stacked bar: cyan (leased) → amber (resident) → dark bg (free). + const leasedPct = (leased / total * 100).toFixed(1); + const residentPct = (resident / total * 100).toFixed(1); + const track = el('div', { cls: 'vram-track' }); + const fillLeased = el('div', { cls: 'vram-leased', style: { width: leasedPct + '%' } }); + const fillResident = el('div', { cls: 'vram-resident', style: { left: leasedPct + '%', width: residentPct + '%' } }); + append(track, fillLeased, fillResident); - const vramLbl = el('div', { cls: 'vram-label', text: usedGb + ' / ' + totalGb + ' GB' }); - const statusEl = el('div', { cls: 'gpu-status ' + statusCls, text: statusText }); + // Breakdown label when something is allocated. + let labelText = (used / 1024).toFixed(1) + ' / ' + (total / 1024).toFixed(1) + ' GB'; + if (leased > 0 || resident > 0) { + const parts = []; + if (leased > 0) parts.push((leased / 1024).toFixed(1) + 'G leased'); + if (resident > 0) parts.push((resident / 1024).toFixed(1) + 'G resident'); + labelText += ' (' + parts.join(' · ') + ')'; + } + const vramLbl = el('div', { cls: 'vram-label', text: labelText }); + const statusEl = el('div', { cls: 'gpu-status ' + statusCls, text: statusText }); const sparkTrack = el('div', { cls: 'spark-track' }); - sparkTrack.appendChild(buildSparkline(sparkHistory[key], gpu.vram_total_mb)); + sparkTrack.appendChild(buildSparkline(sparkHistory[key], total)); append(card, nodeLabel, nameLine, track, vramLbl, statusEl, sparkTrack); cards.push(card); @@ -252,6 +335,40 @@ function renderNodes(nodes) { setChildren(grid, ...cards); } +// ── render: warm models table ──────────────────────────────────── +function renderResidents(residents) { + const tbody = document.getElementById('resident-body'); + if (!residents || residents.length === 0) { + const tr = document.createElement('tr'); + const td = el('td', { cls: 'td-none', text: 'No warm models detected.' }); + td.setAttribute('colspan', '4'); + tr.appendChild(td); + setChildren(tbody, tr); + return; + } + + const now = Date.now() / 1000; + const rows = residents.map(r => { + const warmSecs = now - (r.first_seen || now); + const warmText = warmSecs < 60 + ? Math.floor(warmSecs) + 's' + : warmSecs < 3600 + ? Math.floor(warmSecs / 60) + 'm ' + String(Math.floor(warmSecs % 60)).padStart(2, '0') + 's' + : Math.floor(warmSecs / 3600) + 'h ' + String(Math.floor((warmSecs % 3600) / 60)).padStart(2, '0') + 'm'; + + const tr = document.createElement('tr'); + append(tr, + el('td', { cls: 'td-service', text: r.service }), + el('td', { cls: 'td-node', text: r.node_id }), + el('td', { cls: 'td-model', text: r.model_name || '—' }), + el('td', { cls: 'td-warm', text: warmText }), + ); + return tr; + }); + + setChildren(tbody, ...rows); +} + // ── render: leases table ───────────────────────────────────────── function renderLeases(leases) { const tbody = document.getElementById('leases-body'); @@ -316,17 +433,33 @@ function clearError() { document.getElementById('error-banner').style.display = // ── poll ───────────────────────────────────────────────────────── async function poll() { try { - const [nodesRes, leasesRes, healthRes] = await Promise.all([ + const [nodesRes, leasesRes, residentRes, healthRes, servicesRes] = await Promise.all([ fetch('/api/nodes'), fetch('/api/leases'), + fetch('/api/resident'), fetch('/api/health'), + fetch('/api/services'), ]); if (!nodesRes.ok || !leasesRes.ok) throw new Error('API error: ' + nodesRes.status); - const [nodesData, leasesData] = await Promise.all([nodesRes.json(), leasesRes.json()]); + const [nodesData, leasesData, residentData, servicesData] = await Promise.all([ + nodesRes.json(), leasesRes.json(), + residentRes.ok ? residentRes.json() : Promise.resolve({ residents: [] }), + servicesRes.ok ? servicesRes.json() : Promise.resolve({ services: [] }), + ]); + + // Build per-GPU leased-MB index for the stacked bar. + const leasedByGpu = {}; + for (const lease of (leasesData.leases || [])) { + const key = lease.node_id + ':' + lease.gpu_id; + leasedByGpu[key] = (leasedByGpu[key] || 0) + lease.mb_granted; + } + clearError(); renderHealth(healthRes.ok); - renderNodes(nodesData.nodes || []); + renderNodes(nodesData.nodes || [], leasedByGpu); + renderServices(servicesData.services || []); renderLeases(leasesData.leases || []); + renderResidents(residentData.residents || []); } catch (err) { showError('Failed to reach coordinator: ' + err.message); renderHealth(false); diff --git a/circuitforge_core/resources/coordinator/lease_manager.py b/circuitforge_core/resources/coordinator/lease_manager.py index 652b64b..80c7c65 100644 --- a/circuitforge_core/resources/coordinator/lease_manager.py +++ b/circuitforge_core/resources/coordinator/lease_manager.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio from collections import defaultdict -from circuitforge_core.resources.models import VRAMLease +from circuitforge_core.resources.models import ResidentAllocation, VRAMLease class LeaseManager: @@ -12,6 +12,9 @@ class LeaseManager: self._gpu_total: dict[tuple[str, int], int] = {} self._gpu_used: dict[tuple[str, int], int] = defaultdict(int) self._lock = asyncio.Lock() + # Resident allocations — keyed "node_id:service", updated by heartbeat. + # No lock needed: only the single heartbeat task writes this dict. + self._residents: dict[str, ResidentAllocation] = {} def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None: self._gpu_total[(node_id, gpu_id)] = total_mb @@ -86,3 +89,42 @@ class LeaseManager: def all_leases(self) -> list[VRAMLease]: return list(self._leases.values()) + + # ── resident tracking ──────────────────────────────────────────── + + def set_residents_for_node( + self, + node_id: str, + residents: list[tuple[str, str | None]], # (service, model_name) + ) -> None: + """ + Replace the resident snapshot for a node. + + Preserves first_seen for entries whose service+model_name are unchanged, + so the dashboard can show how long a model has been warm. + """ + new_keys = {f"{node_id}:{service}" for service, _ in residents} + + # Remove stale entries (service no longer running on this node). + for key in list(self._residents): + if key.startswith(f"{node_id}:") and key not in new_keys: + del self._residents[key] + + # Upsert: preserve first_seen when model is unchanged, reset otherwise. + for service, model_name in residents: + key = f"{node_id}:{service}" + existing = self._residents.get(key) + if existing is not None and existing.model_name == model_name: + continue # same model still loaded — keep original first_seen + self._residents[key] = ResidentAllocation( + service=service, + node_id=node_id, + model_name=model_name, + ) + + def all_residents(self) -> list[ResidentAllocation]: + return list(self._residents.values()) + + def resident_keys(self) -> set[str]: + """Return set of 'node_id:service' strings for currently-warm services.""" + return set(self._residents.keys()) diff --git a/circuitforge_core/resources/coordinator/node_selector.py b/circuitforge_core/resources/coordinator/node_selector.py new file mode 100644 index 0000000..9cdb9f4 --- /dev/null +++ b/circuitforge_core/resources/coordinator/node_selector.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord + from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + +_WARM_BONUS_MB = 1000 + + +@dataclass(frozen=True) +class _Scored: + node_id: str + gpu_id: int + vram_free_mb: int + effective_free_mb: int + can_fit: bool + warm: bool + + +def select_node( + agents: "dict[str, AgentRecord]", + service: str, + profile_registry: "ProfileRegistry", + resident_keys: set[str], +) -> tuple[str, int] | None: + """ + Pick the best (node_id, gpu_id) for the requested service. + Warm nodes (service already running) get priority, then sorted by free VRAM. + Returns None if no suitable node exists. + """ + service_max_mb = _find_service_max_mb(service, profile_registry) + if service_max_mb is None: + return None # service not in any profile + + candidates: list[_Scored] = [] + for node_id, record in agents.items(): + if not record.online: + continue + for gpu in record.gpus: + warm = f"{node_id}:{service}" in resident_keys + effective = gpu.vram_free_mb + (_WARM_BONUS_MB if warm else 0) + can_fit = gpu.vram_free_mb >= service_max_mb // 2 + candidates.append(_Scored( + node_id=node_id, + gpu_id=gpu.gpu_id, + vram_free_mb=gpu.vram_free_mb, + effective_free_mb=effective, + can_fit=can_fit, + warm=warm, + )) + if not candidates: + return None + # Prefer: (1) warm nodes (model already resident — no cold start) + # (2) cold nodes that can fit the service (free >= half of max_mb) + # Fallback: best-effort node when nothing fits and nothing is warm + # (coordinator will attempt to start the service anyway; it may evict or fail) + # Note: resident_keys are per-node, not per-GPU. On multi-GPU nodes, the warm + # bonus applies to all GPUs on the node. This is a known coarseness — + # per-GPU resident tracking requires a resident_key format change. + preferred = [c for c in candidates if c.warm or c.can_fit] + pool = preferred if preferred else candidates + best = max(pool, key=lambda c: (c.warm, c.effective_free_mb)) + return best.node_id, best.gpu_id + + +def _find_service_max_mb(service: str, profile_registry: "ProfileRegistry") -> int | None: + for profile in profile_registry.list_public(): + svc = profile.services.get(service) + if svc is not None: + return svc.max_mb + return None diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py new file mode 100644 index 0000000..c258ad7 --- /dev/null +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import dataclasses +import time +import uuid +from dataclasses import dataclass +from typing import Literal + + +@dataclass +class ServiceAllocation: + allocation_id: str + service: str + node_id: str + gpu_id: int + model: str | None + caller: str + url: str + created_at: float + expires_at: float # 0 = no expiry + + +@dataclass +class ServiceInstance: + service: str + node_id: str + gpu_id: int + state: Literal["starting", "running", "idle", "stopped"] + model: str | None + url: str | None + idle_since: float | None = None + + +class ServiceRegistry: + """ + In-memory registry of service allocations and instance state. + + Allocations: per-caller request — many per service instance. + Instances: per (service, node_id, gpu_id) — one per running container. + """ + + def __init__(self) -> None: + self._allocations: dict[str, ServiceAllocation] = {} + self._instances: dict[str, ServiceInstance] = {} # key: "service:node_id:gpu_id" + + # ── allocation API ──────────────────────────────────────────────── + + def allocate( + self, + service: str, + node_id: str, + gpu_id: int, + model: str | None, + url: str, + caller: str, + ttl_s: float, + ) -> ServiceAllocation: + alloc = ServiceAllocation( + allocation_id=str(uuid.uuid4()), + service=service, + node_id=node_id, + gpu_id=gpu_id, + model=model, + caller=caller, + url=url, + created_at=time.time(), + expires_at=time.time() + ttl_s if ttl_s > 0 else 0.0, + ) + self._allocations[alloc.allocation_id] = alloc + + # If an instance exists in idle/stopped state, mark it running again + key = f"{service}:{node_id}:{gpu_id}" + if key in self._instances: + inst = self._instances[key] + if inst.state in ("idle", "stopped"): + self._instances[key] = dataclasses.replace( + inst, state="running", idle_since=None + ) + return alloc + + def release(self, allocation_id: str) -> bool: + alloc = self._allocations.pop(allocation_id, None) + if alloc is None: + return False + # If no active allocations remain for this instance, mark it idle + key = f"{alloc.service}:{alloc.node_id}:{alloc.gpu_id}" + if self.active_allocations(alloc.service, alloc.node_id, alloc.gpu_id) == 0: + if key in self._instances: + self._instances[key] = dataclasses.replace( + self._instances[key], state="idle", idle_since=time.time() + ) + return True + + def active_allocations(self, service: str, node_id: str, gpu_id: int) -> int: + return sum( + 1 for a in self._allocations.values() + if a.service == service and a.node_id == node_id and a.gpu_id == gpu_id + ) + + # ── instance API ───────────────────────────────────────────────── + + def upsert_instance( + self, + service: str, + node_id: str, + gpu_id: int, + state: Literal["starting", "running", "idle", "stopped"], + model: str | None, + url: str | None, + ) -> ServiceInstance: + key = f"{service}:{node_id}:{gpu_id}" + existing = self._instances.get(key) + idle_since: float | None = None + if state == "idle": + # Preserve idle_since if already idle; set now if transitioning into idle + idle_since = existing.idle_since if (existing and existing.state == "idle") else time.time() + inst = ServiceInstance( + service=service, node_id=node_id, gpu_id=gpu_id, + state=state, model=model, url=url, idle_since=idle_since, + ) + self._instances[key] = inst + return inst + + def get_allocation(self, allocation_id: str) -> ServiceAllocation | None: + return self._allocations.get(allocation_id) + + def sweep_expired_allocations(self) -> list[str]: + """ + Remove all allocations whose TTL has elapsed and transition the + corresponding instance to 'idle' if no active allocations remain. + Returns the list of expired allocation_ids. + """ + now = time.time() + expired = [ + alloc_id + for alloc_id, alloc in self._allocations.items() + if alloc.expires_at > 0 and now > alloc.expires_at + ] + for alloc_id in expired: + self.release(alloc_id) + return expired + + def all_allocations(self) -> list[ServiceAllocation]: + return list(self._allocations.values()) + + def all_instances(self) -> list[ServiceInstance]: + return list(self._instances.values()) + + def mark_stopped(self, service: str, node_id: str, gpu_id: int) -> None: + """Transition an instance to 'stopped' state and clear idle_since.""" + key = f"{service}:{node_id}:{gpu_id}" + if key in self._instances: + self._instances[key] = dataclasses.replace( + self._instances[key], state="stopped", idle_since=None + ) + + def idle_past_timeout(self, idle_stop_config: dict[str, int]) -> list[ServiceInstance]: + """ + Return instances in 'idle' state whose idle time exceeds their configured timeout. + idle_stop_config: {service_name: seconds} — 0 means never stop automatically. + """ + now = time.time() + result = [] + for inst in self._instances.values(): + if inst.state != "idle" or inst.idle_since is None: + continue + timeout = idle_stop_config.get(inst.service, 0) + if timeout > 0 and (now - inst.idle_since) >= timeout: + result.append(inst) + return result diff --git a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml index 7ad59f9..84daf6a 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml @@ -6,6 +6,7 @@ services: vllm: max_mb: 12288 priority: 1 + idle_stop_after_s: 600 ollama: max_mb: 12288 priority: 1 @@ -14,6 +15,11 @@ services: priority: 2 shared: true max_concurrent: 4 + cf-docuvision: + max_mb: 6144 + priority: 2 + shared: true + max_concurrent: 3 cf-stt: max_mb: 1200 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml index 4f98eb8..e0ca256 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml @@ -6,6 +6,7 @@ services: vllm: max_mb: 20480 priority: 1 + idle_stop_after_s: 600 ollama: max_mb: 18432 priority: 1 @@ -14,6 +15,11 @@ services: priority: 2 shared: true max_concurrent: 6 + cf-docuvision: + max_mb: 8192 + priority: 2 + shared: true + max_concurrent: 4 cf-stt: max_mb: 1200 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml index 92168ef..1888aa4 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml @@ -6,6 +6,17 @@ services: vllm: max_mb: 4096 priority: 1 + idle_stop_after_s: 600 + managed: + type: docker + image: "vllm/vllm-openai:v0.9.2" + port: 8000 + host_port: 8000 + command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8" + volumes: + - "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models" + runtime: nvidia + ipc: host ollama: max_mb: 3584 priority: 1 @@ -14,6 +25,11 @@ services: priority: 2 shared: true max_concurrent: 2 + cf-docuvision: + max_mb: 3072 + priority: 2 + shared: true + max_concurrent: 1 cf-stt: max_mb: 600 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml index 7053419..614416d 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml @@ -6,6 +6,17 @@ services: vllm: max_mb: 5120 priority: 1 + idle_stop_after_s: 600 + managed: + type: docker + image: "vllm/vllm-openai:v0.9.2" + port: 8000 + host_port: 8000 + command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8" + volumes: + - "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models" + runtime: nvidia + ipc: host ollama: max_mb: 4096 priority: 1 @@ -14,6 +25,11 @@ services: priority: 2 shared: true max_concurrent: 3 + cf-docuvision: + max_mb: 4096 + priority: 2 + shared: true + max_concurrent: 2 cf-stt: max_mb: 1200 priority: 2 @@ -28,6 +44,13 @@ services: comfyui: max_mb: 6144 priority: 4 + managed: + type: process + exec_path: "/opt/miniconda3/envs/comfyui/bin/python" + args_template: "/opt/ComfyUI/main.py --listen 0.0.0.0 --port {port} --cuda-device {gpu_id}" + cwd: "/opt/ComfyUI" + port: 8188 + host_port: 8188 model_size_hints: llm_max_params: 8b image_gen_max: sdxl-fp8 diff --git a/circuitforge_core/resources/profiles/schema.py b/circuitforge_core/resources/profiles/schema.py index ac59020..1ba8ee5 100644 --- a/circuitforge_core/resources/profiles/schema.py +++ b/circuitforge_core/resources/profiles/schema.py @@ -5,22 +5,72 @@ from pathlib import Path from typing import Any import yaml -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator SUPPORTED_SCHEMA_VERSION = 1 +class DockerSpec(BaseModel): + """Spec for a Docker-managed service.""" + + image: str + port: int + host_port: int + command_template: str = "" + volumes: list[str] = Field(default_factory=list) + env: dict[str, str] = Field(default_factory=dict) + runtime: str = "nvidia" + ipc: str = "host" + + model_config = {"frozen": True} + + +class ProcessSpec(BaseModel): + """Spec for a process-managed service (non-Docker, e.g. conda env).""" + + exec_path: str + args_template: str = "" + cwd: str = "" + env: dict[str, str] = Field(default_factory=dict) + port: int = 0 + host_port: int = 0 + + model_config = {"frozen": True} + + class ServiceProfile(BaseModel): max_mb: int priority: int shared: bool = False max_concurrent: int = 1 always_on: bool = False + idle_stop_after_s: int = 0 backend: str | None = None consumers: list[str] = Field(default_factory=list) + managed: DockerSpec | ProcessSpec | None = None model_config = {"frozen": True} + @model_validator(mode="before") + @classmethod + def _parse_managed(cls, values: Any) -> Any: + if not isinstance(values, dict): + return values + raw = values.get("managed") + if raw is None: + return values + if not isinstance(raw, dict): + return values + spec_type = raw.get("type") + managed_fields = {k: v for k, v in raw.items() if k != "type"} + if spec_type == "docker": + values["managed"] = DockerSpec(**managed_fields) + elif spec_type == "process": + values["managed"] = ProcessSpec(**managed_fields) + else: + raise ValueError(f"Unknown managed service type: {spec_type!r}") + return values + class GpuNodeEntry(BaseModel): id: int diff --git a/tests/test_resources/test_agent_supervisor.py b/tests/test_resources/test_agent_supervisor.py new file mode 100644 index 0000000..f669b62 --- /dev/null +++ b/tests/test_resources/test_agent_supervisor.py @@ -0,0 +1,93 @@ +import asyncio +import time +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry, ServiceInstance + + +def test_build_idle_stop_config_empty_without_registry(): + lm = LeaseManager() + supervisor = AgentSupervisor(lease_manager=lm) + assert supervisor._build_idle_stop_config() == {} + + +def test_build_idle_stop_config_from_profiles(): + lm = LeaseManager() + mock_svc = MagicMock() + mock_svc.idle_stop_after_s = 600 + mock_profile = MagicMock() + mock_profile.services = {"vllm": mock_svc} + mock_profile_registry = MagicMock() + mock_profile_registry.list_public.return_value = [mock_profile] + + supervisor = AgentSupervisor(lease_manager=lm, profile_registry=mock_profile_registry) + config = supervisor._build_idle_stop_config() + assert config == {"vllm": 600} + + +@pytest.mark.asyncio +async def test_run_idle_sweep_posts_stop(): + lm = LeaseManager() + service_registry = ServiceRegistry() + + # Upsert instance as running, then allocate + release to transition it to idle + service_registry.upsert_instance( + service="vllm", + node_id="heimdall", + gpu_id=0, + state="running", + model="test-model", + url="http://heimdall:8000", + ) + alloc = service_registry.allocate( + service="vllm", + node_id="heimdall", + gpu_id=0, + model="test-model", + url="http://heimdall:8000", + caller="test", + ttl_s=300.0, + ) + service_registry.release(alloc.allocation_id) + + # Backdate idle_since so it exceeds the timeout + import dataclasses + key = "vllm:heimdall:0" + inst = service_registry._instances[key] + service_registry._instances[key] = dataclasses.replace(inst, idle_since=time.time() - 700) + + mock_profile_registry = MagicMock() + mock_svc = MagicMock() + mock_svc.idle_stop_after_s = 600 + mock_profile = MagicMock() + mock_profile.services = {"vllm": mock_svc} + mock_profile_registry.list_public.return_value = [mock_profile] + + supervisor = AgentSupervisor( + lease_manager=lm, + service_registry=service_registry, + profile_registry=mock_profile_registry, + ) + supervisor.register("heimdall", "http://heimdall:7701") + + posted_urls = [] + + async def fake_http_post(url: str) -> bool: + posted_urls.append(url) + return True + + supervisor._http_post = fake_http_post + await supervisor._run_idle_sweep() + + assert len(posted_urls) == 1 + assert posted_urls[0] == "http://heimdall:7701/services/vllm/stop" + + +@pytest.mark.asyncio +async def test_run_idle_sweep_skips_without_registry(): + lm = LeaseManager() + supervisor = AgentSupervisor(lease_manager=lm) + # Should return immediately without error + await supervisor._run_idle_sweep() diff --git a/tests/test_resources/test_client.py b/tests/test_resources/test_client.py new file mode 100644 index 0000000..288fb64 --- /dev/null +++ b/tests/test_resources/test_client.py @@ -0,0 +1,94 @@ +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import httpretty +from circuitforge_core.resources.client import CFOrchClient, Allocation + +_ALLOC_BODY = ( + '{"allocation_id":"abc123","service":"vllm","node_id":"heimdall",' + '"gpu_id":0,"model":"Ouro-1.4B","url":"http://heimdall:8000","started":false,"warm":true}' +) + + +@httpretty.activate +def test_sync_allocate_returns_allocation(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + body=_ALLOC_BODY, content_type="application/json", + ) + httpretty.register_uri( + httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/abc123", + body='{"released":true}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with client.allocate("vllm", model_candidates=["Ouro-1.4B"], caller="test") as alloc: + assert isinstance(alloc, Allocation) + assert alloc.url == "http://heimdall:8000" + assert alloc.model == "Ouro-1.4B" + assert alloc.allocation_id == "abc123" + assert httpretty.last_request().method == "DELETE" + + +@httpretty.activate +def test_sync_allocate_ignores_404_on_release(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + body='{"allocation_id":"xyz","service":"vllm","node_id":"a","gpu_id":0,' + '"model":"m","url":"http://a:8000","started":false,"warm":false}', + content_type="application/json", + ) + httpretty.register_uri( + httpretty.DELETE, "http://orch:7700/api/services/vllm/allocations/xyz", + status=404, body='{"detail":"not found"}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with client.allocate("vllm", model_candidates=["m"]) as alloc: + assert alloc.url == "http://a:8000" + # No exception raised — 404 on release is silently ignored + + +@httpretty.activate +def test_sync_allocate_raises_on_503(): + httpretty.register_uri( + httpretty.POST, "http://orch:7700/api/services/vllm/allocate", + status=503, body='{"detail":"no capacity"}', content_type="application/json", + ) + client = CFOrchClient("http://orch:7700") + with pytest.raises(RuntimeError, match="cf-orch allocation failed"): + with client.allocate("vllm", model_candidates=["m"]): + pass + + +async def test_async_allocate_works(): + # httpretty only patches stdlib sockets; httpx async uses anyio sockets so + # we mock httpx.AsyncClient directly instead. + alloc_data = { + "allocation_id": "a1", "service": "vllm", "node_id": "n", + "gpu_id": 0, "model": "m", "url": "http://n:8000", + "started": False, "warm": False, + } + release_data = {"released": True} + + def _make_response(data, status_code=200): + resp = MagicMock() + resp.is_success = status_code < 400 + resp.status_code = status_code + resp.json.return_value = data + return resp + + mock_post = AsyncMock(return_value=_make_response(alloc_data)) + mock_delete = AsyncMock(return_value=_make_response(release_data)) + + mock_async_client = MagicMock() + mock_async_client.post = mock_post + mock_async_client.delete = mock_delete + mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) + mock_async_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_async_client): + client = CFOrchClient("http://orch:7700") + async with client.allocate_async("vllm", model_candidates=["m"]) as alloc: + assert alloc.url == "http://n:8000" + assert alloc.allocation_id == "a1" + mock_delete.assert_called_once() diff --git a/tests/test_resources/test_coordinator_allocate.py b/tests/test_resources/test_coordinator_allocate.py new file mode 100644 index 0000000..eb9e078 --- /dev/null +++ b/tests/test_resources/test_coordinator_allocate.py @@ -0,0 +1,132 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi.testclient import TestClient +from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry +from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord +from circuitforge_core.resources.models import GpuInfo, NodeInfo + + +def _make_supervisor_mock(online: bool = True): + sup = MagicMock() + record = AgentRecord(node_id="heimdall", agent_url="http://heimdall:7701") + record.gpus = [GpuInfo(0, "RTX 4000", 8192, 0, 8192)] + record.online = online + sup.online_agents.return_value = {"heimdall": record} if online else {} + sup.get_node_info.return_value = NodeInfo( + node_id="heimdall", + agent_url="http://heimdall:7701", + gpus=record.gpus, + last_heartbeat=0.0, + ) + return sup + + +@pytest.fixture +def alloc_client(): + lm = LeaseManager() + pr = ProfileRegistry() + sup = _make_supervisor_mock() + sr = ServiceRegistry() + app = create_coordinator_app(lease_manager=lm, profile_registry=pr, agent_supervisor=sup, service_registry=sr) + return TestClient(app), sup, sr + + +def test_allocate_returns_allocation_id_and_url(alloc_client): + client, sup, sr = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + data = resp.json() + assert "allocation_id" in data + assert data["service"] == "vllm" + assert data["node_id"] == "heimdall" + assert data["url"] == "http://heimdall:8000" + + +def test_allocate_returns_503_when_no_online_nodes(alloc_client): + client, sup, sr = alloc_client + sup.online_agents.return_value = {} + resp = client.post("/api/services/vllm/allocate", json={"model_candidates": ["Ouro-1.4B"]}) + assert resp.status_code == 503 + + +def test_allocate_returns_422_for_empty_candidates(alloc_client): + client, _, sr = alloc_client + resp = client.post("/api/services/vllm/allocate", json={"model_candidates": []}) + assert resp.status_code == 422 + + +def test_allocate_returns_422_for_unknown_service(alloc_client): + client, _, sr = alloc_client + resp = client.post("/api/services/cf-made-up/allocate", json={"model_candidates": ["x"]}) + assert resp.status_code == 422 + + +def test_allocate_records_in_registry(alloc_client): + client, sup, sr = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + allocation_id = resp.json()["allocation_id"] + + status_resp = client.get("/api/services/vllm/status") + assert status_resp.status_code == 200 + status_data = status_resp.json() + assert status_data["service"] == "vllm" + alloc_ids = [a["allocation_id"] for a in status_data["allocations"]] + assert allocation_id in alloc_ids + + +def test_release_allocation(alloc_client): + client, sup, sr = alloc_client + with patch("httpx.AsyncClient") as mock_http: + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"running": True, "url": "http://heimdall:8000"} + mock_http.return_value.__aenter__.return_value.post = AsyncMock(return_value=mock_resp) + + resp = client.post("/api/services/vllm/allocate", json={ + "model_candidates": ["Ouro-1.4B"], + "ttl_s": 300.0, + "caller": "test", + }) + + assert resp.status_code == 200 + allocation_id = resp.json()["allocation_id"] + + del_resp = client.delete(f"/api/services/vllm/allocations/{allocation_id}") + assert del_resp.status_code == 200 + assert del_resp.json() == {"released": True, "allocation_id": allocation_id} + + status_resp = client.get("/api/services/vllm/status") + alloc_ids = [a["allocation_id"] for a in status_resp.json()["allocations"]] + assert allocation_id not in alloc_ids + + +def test_release_allocation_not_found(alloc_client): + client, _, sr = alloc_client + resp = client.delete("/api/services/vllm/allocations/bad-id") + assert resp.status_code == 404 diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py index e8f6bbe..49eacbf 100644 --- a/tests/test_resources/test_coordinator_app.py +++ b/tests/test_resources/test_coordinator_app.py @@ -1,10 +1,14 @@ import pytest from unittest.mock import MagicMock +from pathlib import Path from fastapi.testclient import TestClient from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo +from circuitforge_core.resources.profiles.schema import load_profile @pytest.fixture @@ -32,6 +36,7 @@ def coordinator_client(): lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=supervisor, + service_registry=ServiceRegistry(), ) return TestClient(app), lease_manager @@ -112,3 +117,67 @@ def test_dashboard_serves_html(coordinator_client): assert "cf-orch" in resp.text assert "/api/nodes" in resp.text assert "/api/leases" in resp.text + + +def test_online_agents_excludes_offline(): + lm = LeaseManager() + sup = AgentSupervisor(lm) + sup.register("online_node", "http://a:7701") + sup.register("offline_node", "http://b:7701") + sup._agents["online_node"].online = True + sup._agents["offline_node"].online = False + result = sup.online_agents() + assert "online_node" in result + assert "offline_node" not in result + + +def test_resident_keys_returns_set_of_node_service(): + lm = LeaseManager() + lm.set_residents_for_node("heimdall", [("vllm", "Ouro-1.4B"), ("ollama", None)]) + keys = lm.resident_keys() + assert keys == {"heimdall:vllm", "heimdall:ollama"} + + +def test_single_gpu_8gb_profile_has_idle_stop_after_s(): + profile = load_profile( + Path("circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml") + ) + vllm_svc = profile.services.get("vllm") + assert vllm_svc is not None + assert hasattr(vllm_svc, "idle_stop_after_s") + assert vllm_svc.idle_stop_after_s == 600 + + +def test_ensure_service_returns_503_when_vram_too_low(): + """VRAM pre-flight guard fires before any HTTP request when free VRAM < max_mb // 2.""" + # vllm max_mb = 5120 → threshold = 2560 MB; 100 MB free triggers 503. + lease_manager = LeaseManager() + lease_manager.register_gpu("low-vram-node", 0, 512) + profile_registry = ProfileRegistry() + supervisor = MagicMock() + supervisor.get_node_info.return_value = NodeInfo( + node_id="low-vram-node", + agent_url="http://localhost:7701", + gpus=[GpuInfo(gpu_id=0, name="GTX 1050", + vram_total_mb=512, vram_used_mb=412, vram_free_mb=100)], + last_heartbeat=0.0, + ) + supervisor.all_nodes.return_value = [] + app = create_coordinator_app( + lease_manager=lease_manager, + profile_registry=profile_registry, + agent_supervisor=supervisor, + service_registry=ServiceRegistry(), + ) + client = TestClient(app) + + resp = client.post("/api/services/vllm/ensure", json={ + "node_id": "low-vram-node", + "gpu_id": 0, + "params": {"model": "some-model"}, + }) + + assert resp.status_code == 503 + assert "Insufficient VRAM" in resp.json()["detail"] + # Guard must fire before any agent HTTP call is attempted. + supervisor.get_node_info.assert_called_once_with("low-vram-node") diff --git a/tests/test_resources/test_integration.py b/tests/test_resources/test_integration.py index 814bb32..8fa94ad 100644 --- a/tests/test_resources/test_integration.py +++ b/tests/test_resources/test_integration.py @@ -11,6 +11,7 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo @@ -47,6 +48,7 @@ def system(): lease_manager=lease_manager, profile_registry=profile_registry, agent_supervisor=mock_supervisor, + service_registry=ServiceRegistry(), ) client = TestClient(app) return client, lease_manager diff --git a/tests/test_resources/test_node_selector.py b/tests/test_resources/test_node_selector.py new file mode 100644 index 0000000..9e18a3a --- /dev/null +++ b/tests/test_resources/test_node_selector.py @@ -0,0 +1,56 @@ +import pytest +from circuitforge_core.resources.coordinator.node_selector import select_node +from circuitforge_core.resources.coordinator.agent_supervisor import AgentRecord +from circuitforge_core.resources.models import GpuInfo +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + + +def _make_agent(node_id: str, free_mb: int, online: bool = True) -> AgentRecord: + r = AgentRecord(node_id=node_id, agent_url=f"http://{node_id}:7701") + r.gpus = [GpuInfo(gpu_id=0, name="RTX", vram_total_mb=8192, + vram_used_mb=8192 - free_mb, vram_free_mb=free_mb)] + r.online = online + return r + + +def test_selects_node_with_most_free_vram(): + agents = { + "a": _make_agent("a", free_mb=2000), + "b": _make_agent("b", free_mb=6000), + } + registry = ProfileRegistry() + result = select_node(agents, "vllm", registry, resident_keys=set()) + assert result == ("b", 0) + + +def test_prefers_warm_node_even_with_less_free_vram(): + agents = { + "a": _make_agent("a", free_mb=2000), + "b": _make_agent("b", free_mb=6000), + } + registry = ProfileRegistry() + result = select_node(agents, "vllm", registry, resident_keys={"a:vllm"}) + assert result == ("a", 0) + + +def test_excludes_offline_nodes(): + agents = { + "a": _make_agent("a", free_mb=8000, online=False), + "b": _make_agent("b", free_mb=2000, online=True), + } + registry = ProfileRegistry() + result = select_node(agents, "vllm", registry, resident_keys=set()) + assert result == ("b", 0) + + +def test_returns_none_when_no_node_has_profile_for_service(): + agents = {"a": _make_agent("a", free_mb=8000)} + registry = ProfileRegistry() + result = select_node(agents, "cf-nonexistent-service", registry, resident_keys=set()) + assert result is None + + +def test_returns_none_when_no_agents(): + registry = ProfileRegistry() + result = select_node({}, "vllm", registry, resident_keys=set()) + assert result is None diff --git a/tests/test_resources/test_service_registry.py b/tests/test_resources/test_service_registry.py new file mode 100644 index 0000000..dc73a9c --- /dev/null +++ b/tests/test_resources/test_service_registry.py @@ -0,0 +1,86 @@ +import time +import dataclasses +import pytest +from circuitforge_core.resources.coordinator.service_registry import ( + ServiceRegistry, ServiceAllocation, ServiceInstance, +) + + +@pytest.fixture +def registry(): + return ServiceRegistry() + + +def test_allocate_creates_allocation(registry): + alloc = registry.allocate( + service="vllm", node_id="heimdall", gpu_id=0, + model="Ouro-1.4B", url="http://heimdall:8000", + caller="test", ttl_s=300.0, + ) + assert alloc.service == "vllm" + assert alloc.node_id == "heimdall" + assert alloc.allocation_id # non-empty UUID string + + +def test_active_allocations_count(registry): + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "b", 300.0) + assert registry.active_allocations("vllm", "heimdall", 0) == 2 + + +def test_release_decrements_count(registry): + alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "a", 300.0) + registry.release(alloc.allocation_id) + assert registry.active_allocations("vllm", "heimdall", 0) == 0 + + +def test_release_nonexistent_returns_false(registry): + assert registry.release("nonexistent-id") is False + + +def test_upsert_instance_sets_running_state(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="Ouro-1.4B", url="http://heimdall:8000") + instances = registry.all_instances() + assert len(instances) == 1 + assert instances[0].state == "running" + + +def test_release_last_alloc_marks_instance_idle(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="Ouro-1.4B", url="http://heimdall:8000") + alloc = registry.allocate("vllm", "heimdall", 0, "Ouro-1.4B", "http://heimdall:8000", "a", 300.0) + registry.release(alloc.allocation_id) + instance = registry.all_instances()[0] + assert instance.state == "idle" + assert instance.idle_since is not None + + +def test_new_alloc_on_idle_instance_marks_it_running(registry): + registry.upsert_instance("vllm", "heimdall", 0, state="idle", + model="M", url="http://h:8000") + registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0) + assert registry.all_instances()[0].state == "running" + + +def test_sweep_expired_allocations(registry): + # Register a running instance so idle-transition logic has something to act on. + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="M", url="http://h:8000") + # Create an allocation with a very short TTL (1 second). + alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "caller", ttl_s=1) + assert registry.active_allocations("vllm", "heimdall", 0) == 1 + + # Wait for TTL to elapse. + time.sleep(1.1) + + expired = registry.sweep_expired_allocations() + + # The allocation should have been swept. + assert alloc.allocation_id in expired + assert registry.active_allocations("vllm", "heimdall", 0) == 0 + + # The instance should have transitioned to idle since no allocations remain. + instance = registry.all_instances()[0] + assert instance.state == "idle" + assert instance.idle_since is not None