feat(resources): cf-orch GPU VRAM orchestration — Plan A core #1

Merged
pyr0ball merged 21 commits from feature/cforch-core-orchestration into main 2026-03-31 10:43:53 -07:00
37 changed files with 2094 additions and 1 deletions

4
.gitignore vendored
View file

@ -4,3 +4,7 @@ __pycache__/
*.egg-info/
dist/
.pytest_cache/
.superpowers/
# cf-orch private profiles (commit on personal/heimdall branch only)
circuitforge_core/resources/profiles/private/

View file

View file

@ -0,0 +1,60 @@
from __future__ import annotations
import logging
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel
from circuitforge_core.resources.agent.eviction_executor import EvictionExecutor
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
logger = logging.getLogger(__name__)
class EvictRequest(BaseModel):
pid: int
grace_period_s: float = 5.0
def create_agent_app(
node_id: str,
monitor: GpuMonitor | None = None,
executor: EvictionExecutor | None = None,
) -> FastAPI:
_monitor = monitor or GpuMonitor()
_executor = executor or EvictionExecutor()
app = FastAPI(title=f"cf-orch-agent [{node_id}]")
@app.get("/health")
def health() -> dict[str, Any]:
return {"status": "ok", "node_id": node_id}
@app.get("/gpu-info")
def gpu_info() -> dict[str, Any]:
gpus = _monitor.poll()
return {
"node_id": node_id,
"gpus": [
{
"gpu_id": g.gpu_id,
"name": g.name,
"vram_total_mb": g.vram_total_mb,
"vram_used_mb": g.vram_used_mb,
"vram_free_mb": g.vram_free_mb,
}
for g in gpus
],
}
@app.post("/evict")
def evict(req: EvictRequest) -> dict[str, Any]:
result = _executor.evict_pid(pid=req.pid, grace_period_s=req.grace_period_s)
return {
"success": result.success,
"method": result.method,
"message": result.message,
}
return app

View file

@ -0,0 +1,85 @@
from __future__ import annotations
import logging
import os
import signal
import time
from dataclasses import dataclass
import psutil
logger = logging.getLogger(__name__)
_DEFAULT_GRACE_S = 5.0
@dataclass(frozen=True)
class EvictionResult:
success: bool
method: str # "sigterm", "sigkill", "already_gone", "not_found", "error"
message: str
class EvictionExecutor:
def __init__(self, grace_period_s: float = _DEFAULT_GRACE_S) -> None:
self._default_grace = grace_period_s
def evict_pid(
self,
pid: int,
grace_period_s: float | None = None,
) -> EvictionResult:
grace = grace_period_s if grace_period_s is not None else self._default_grace
if pid <= 0:
return EvictionResult(
success=False, method="error",
message=f"Refusing to signal invalid PID {pid}"
)
if not psutil.pid_exists(pid):
return EvictionResult(
success=False, method="not_found",
message=f"PID {pid} not found"
)
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
return EvictionResult(
success=True, method="already_gone",
message=f"PID {pid} vanished before SIGTERM"
)
except PermissionError as exc:
return EvictionResult(
success=False, method="error",
message=f"Permission denied terminating PID {pid}: {exc}"
)
# Wait for grace period
deadline = time.monotonic() + grace
while time.monotonic() < deadline:
if not psutil.pid_exists(pid):
logger.info("PID %d exited cleanly after SIGTERM", pid)
return EvictionResult(
success=True, method="sigterm",
message=f"PID {pid} exited after SIGTERM"
)
time.sleep(0.05)
# Escalate to SIGKILL
if psutil.pid_exists(pid):
try:
os.kill(pid, signal.SIGKILL)
logger.warning("PID %d required SIGKILL", pid)
return EvictionResult(
success=True, method="sigkill",
message=f"PID {pid} killed with SIGKILL"
)
except ProcessLookupError:
pass
return EvictionResult(
success=True, method="sigkill",
message=f"PID {pid} is gone"
)

View file

@ -0,0 +1,52 @@
from __future__ import annotations
import logging
import subprocess
from circuitforge_core.resources.models import GpuInfo
logger = logging.getLogger(__name__)
_NVIDIA_SMI_CMD = [
"nvidia-smi",
"--query-gpu=index,name,memory.total,memory.used,memory.free",
"--format=csv,noheader,nounits",
]
class GpuMonitor:
def poll(self) -> list[GpuInfo]:
try:
result = subprocess.run(
_NVIDIA_SMI_CMD,
capture_output=True,
text=True,
timeout=5,
)
except (FileNotFoundError, subprocess.TimeoutExpired) as exc:
logger.warning("nvidia-smi unavailable: %s", exc)
return []
if result.returncode != 0:
logger.warning("nvidia-smi exited %d", result.returncode)
return []
return self._parse(result.stdout)
def _parse(self, output: str) -> list[GpuInfo]:
gpus: list[GpuInfo] = []
for line in output.strip().splitlines():
parts = [p.strip() for p in line.split(",")]
if len(parts) != 5:
continue
try:
gpus.append(GpuInfo(
gpu_id=int(parts[0]),
name=parts[1],
vram_total_mb=int(parts[2]),
vram_used_mb=int(parts[3]),
vram_free_mb=int(parts[4]),
))
except ValueError:
logger.debug("Skipping malformed nvidia-smi line: %r", line)
return gpus

View file

@ -0,0 +1,143 @@
from __future__ import annotations
import sys
from pathlib import Path
from typing import Annotated, Optional
import typer
import uvicorn
app = typer.Typer(name="cf-orch", help="CircuitForge GPU resource orchestrator")
_SYSTEMD_UNIT_PATH = Path("/etc/systemd/system/cf-orch.service")
_SYSTEMD_UNIT_TEMPLATE = """\
[Unit]
Description=CircuitForge GPU Resource Orchestrator
After=network.target
[Service]
Type=simple
ExecStart={python} -m circuitforge_core.resources.cli start
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
"""
@app.command()
def start(
profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None,
host: str = "0.0.0.0",
port: int = 7700,
agent_port: int = 7701,
) -> None:
"""Start the cf-orch coordinator (auto-detects GPU profile if not specified)."""
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
lease_manager = LeaseManager()
profile_registry = ProfileRegistry()
supervisor = AgentSupervisor(lease_manager=lease_manager)
monitor = GpuMonitor()
gpus = monitor.poll()
if not gpus:
typer.echo(
"Warning: no GPUs detected via nvidia-smi — coordinator running with 0 VRAM"
)
else:
for gpu in gpus:
lease_manager.register_gpu("local", gpu.gpu_id, gpu.vram_total_mb)
typer.echo(f"Detected {len(gpus)} GPU(s)")
if profile:
active_profile = profile_registry.load(profile)
typer.echo(f"Using profile: {active_profile.name} (from {profile})")
else:
active_profile = (
profile_registry.auto_detect(gpus)
if gpus
else profile_registry.list_public()[-1]
)
typer.echo(f"Auto-selected profile: {active_profile.name}")
coordinator_app = create_coordinator_app(
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=supervisor,
)
typer.echo(f"Starting cf-orch coordinator on {host}:{port}")
uvicorn.run(coordinator_app, host=host, port=port)
@app.command()
def agent(
coordinator: str = "http://localhost:7700",
node_id: str = "local",
host: str = "0.0.0.0",
port: int = 7701,
) -> None:
"""Start a cf-orch node agent (for remote nodes like Navi, Huginn)."""
from circuitforge_core.resources.agent.app import create_agent_app
agent_app = create_agent_app(node_id=node_id)
typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}")
uvicorn.run(agent_app, host=host, port=port)
@app.command()
def status(coordinator: str = "http://localhost:7700") -> None:
"""Show GPU and lease status from the coordinator."""
import httpx
try:
resp = httpx.get(f"{coordinator}/api/nodes", timeout=5.0)
resp.raise_for_status()
nodes = resp.json().get("nodes", [])
for node in nodes:
typer.echo(f"\nNode: {node['node_id']}")
for gpu in node.get("gpus", []):
typer.echo(
f" GPU {gpu['gpu_id']}: {gpu['name']}"
f"{gpu['vram_used_mb']}/{gpu['vram_total_mb']} MB used"
)
except Exception as exc:
typer.echo(f"Coordinator unreachable at {coordinator}: {exc}", err=True)
raise typer.Exit(1)
@app.command("install-service")
def install_service(
dry_run: bool = typer.Option(
False, "--dry-run", help="Print unit file without writing"
),
) -> None:
"""Write a systemd unit file for cf-orch (requires root)."""
python = sys.executable
unit_content = _SYSTEMD_UNIT_TEMPLATE.format(python=python)
if dry_run:
typer.echo(f"Would write to {_SYSTEMD_UNIT_PATH}:\n")
typer.echo(unit_content)
return
try:
_SYSTEMD_UNIT_PATH.write_text(unit_content)
typer.echo(f"Written: {_SYSTEMD_UNIT_PATH}")
typer.echo(
"Run: sudo systemctl daemon-reload && sudo systemctl enable --now cf-orch"
)
except PermissionError:
typer.echo(
f"Permission denied writing to {_SYSTEMD_UNIT_PATH}. Run as root.", err=True
)
raise typer.Exit(1)
if __name__ == "__main__":
app()

View file

@ -0,0 +1,44 @@
# circuitforge_core/resources/compose.yml
# One-command cf-orch deployment for Docker self-hosters:
# docker compose -f path/to/compose.yml up cf-orch-coordinator
services:
cf-orch-coordinator:
image: python:3.12-slim
command: >
sh -c "pip install 'circuitforge-core[orch]' &&
cf-orch start --host 0.0.0.0 --port 7700"
ports:
- "7700:7700"
volumes:
- /run/docker.sock:/var/run/docker.sock:ro
- cf-orch-data:/data
environment:
- CFORCH_PROFILE=${CFORCH_PROFILE:-}
restart: unless-stopped
devices:
- /dev/nvidia0:/dev/nvidia0
- /dev/nvidiactl:/dev/nvidiactl
runtime: nvidia
cf-orch-agent:
image: python:3.12-slim
command: >
sh -c "pip install 'circuitforge-core[orch]' &&
cf-orch agent --coordinator http://cf-orch-coordinator:7700
--node-id ${CFORCH_NODE_ID:-local}
--host 0.0.0.0 --port 7701"
ports:
- "7701:7701"
depends_on:
- cf-orch-coordinator
environment:
- CFORCH_NODE_ID=${CFORCH_NODE_ID:-local}
restart: unless-stopped
devices:
- /dev/nvidia0:/dev/nvidia0
- /dev/nvidiactl:/dev/nvidiactl
runtime: nvidia
volumes:
cf-orch-data:

View file

@ -0,0 +1,101 @@
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
import httpx
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.models import GpuInfo, NodeInfo
logger = logging.getLogger(__name__)
_HEARTBEAT_INTERVAL_S = 10.0
_AGENT_TIMEOUT_S = 5.0
@dataclass
class AgentRecord:
node_id: str
agent_url: str
last_seen: float = field(default_factory=time.time)
gpus: list[GpuInfo] = field(default_factory=list)
online: bool = False
class AgentSupervisor:
def __init__(self, lease_manager: LeaseManager) -> None:
self._agents: dict[str, AgentRecord] = {}
self._lease_manager = lease_manager
self._running = False
def register(self, node_id: str, agent_url: str) -> None:
if node_id not in self._agents:
self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url)
logger.info("Registered agent node: %s @ %s", node_id, agent_url)
def get_node_info(self, node_id: str) -> NodeInfo | None:
record = self._agents.get(node_id)
if record is None:
return None
return NodeInfo(
node_id=record.node_id,
agent_url=record.agent_url,
gpus=record.gpus,
last_heartbeat=record.last_seen,
)
def all_nodes(self) -> list[NodeInfo]:
return [
NodeInfo(
node_id=r.node_id,
agent_url=r.agent_url,
gpus=r.gpus,
last_heartbeat=r.last_seen,
)
for r in self._agents.values()
]
async def poll_agent(self, node_id: str) -> bool:
record = self._agents.get(node_id)
if record is None:
return False
try:
async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client:
resp = await client.get(f"{record.agent_url}/gpu-info")
resp.raise_for_status()
data = resp.json()
gpus = [
GpuInfo(
gpu_id=g["gpu_id"],
name=g["name"],
vram_total_mb=g["vram_total_mb"],
vram_used_mb=g["vram_used_mb"],
vram_free_mb=g["vram_free_mb"],
)
for g in data.get("gpus", [])
]
record.gpus = gpus
record.last_seen = time.time()
record.online = True
for gpu in gpus:
self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb)
return True
except Exception as exc:
logger.warning("Agent %s unreachable: %s", node_id, exc)
record.online = False
return False
async def poll_all(self) -> None:
await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents])
async def run_heartbeat_loop(self) -> None:
self._running = True
while self._running:
await self.poll_all()
await asyncio.sleep(_HEARTBEAT_INTERVAL_S)
def stop(self) -> None:
self._running = False

View file

@ -0,0 +1,129 @@
from __future__ import annotations
from typing import Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
class LeaseRequest(BaseModel):
node_id: str
gpu_id: int
mb: int
service: str
priority: int = 2
ttl_s: float = 0.0
def create_coordinator_app(
lease_manager: LeaseManager,
profile_registry: ProfileRegistry,
agent_supervisor: AgentSupervisor,
) -> FastAPI:
eviction_engine = EvictionEngine(lease_manager=lease_manager)
app = FastAPI(title="cf-orch-coordinator")
@app.get("/api/health")
def health() -> dict[str, Any]:
return {"status": "ok"}
@app.get("/api/nodes")
def get_nodes() -> dict[str, Any]:
nodes = agent_supervisor.all_nodes()
return {
"nodes": [
{
"node_id": n.node_id,
"agent_url": n.agent_url,
"last_heartbeat": n.last_heartbeat,
"gpus": [
{
"gpu_id": g.gpu_id,
"name": g.name,
"vram_total_mb": g.vram_total_mb,
"vram_used_mb": g.vram_used_mb,
"vram_free_mb": g.vram_free_mb,
}
for g in n.gpus
],
}
for n in nodes
]
}
@app.get("/api/profiles")
def get_profiles() -> dict[str, Any]:
return {
"profiles": [
{"name": p.name, "vram_total_mb": p.vram_total_mb}
for p in profile_registry.list_public()
]
}
@app.get("/api/leases")
def get_leases() -> dict[str, Any]:
return {
"leases": [
{
"lease_id": lease.lease_id,
"node_id": lease.node_id,
"gpu_id": lease.gpu_id,
"mb_granted": lease.mb_granted,
"holder_service": lease.holder_service,
"priority": lease.priority,
"expires_at": lease.expires_at,
}
for lease in lease_manager.all_leases()
]
}
@app.post("/api/leases")
async def request_lease(req: LeaseRequest) -> dict[str, Any]:
node_info = agent_supervisor.get_node_info(req.node_id)
if node_info is None:
raise HTTPException(
status_code=422,
detail=f"Unknown node_id {req.node_id!r} — node not registered",
)
agent_url = node_info.agent_url
lease = await eviction_engine.request_lease(
node_id=req.node_id,
gpu_id=req.gpu_id,
mb=req.mb,
service=req.service,
priority=req.priority,
agent_url=agent_url,
ttl_s=req.ttl_s,
)
if lease is None:
raise HTTPException(
status_code=503,
detail="Insufficient VRAM — no eviction candidates available",
)
return {
"lease": {
"lease_id": lease.lease_id,
"node_id": lease.node_id,
"gpu_id": lease.gpu_id,
"mb_granted": lease.mb_granted,
"holder_service": lease.holder_service,
"priority": lease.priority,
"expires_at": lease.expires_at,
}
}
@app.delete("/api/leases/{lease_id}")
async def release_lease(lease_id: str) -> dict[str, Any]:
released = await lease_manager.release(lease_id)
if not released:
raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found")
return {"released": True, "lease_id": lease_id}
return app

View file

@ -0,0 +1,81 @@
from __future__ import annotations
import asyncio
import logging
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.models import VRAMLease
logger = logging.getLogger(__name__)
_DEFAULT_EVICTION_TIMEOUT_S = 10.0
class EvictionEngine:
def __init__(
self,
lease_manager: LeaseManager,
eviction_timeout_s: float = _DEFAULT_EVICTION_TIMEOUT_S,
) -> None:
self.lease_manager = lease_manager
self._timeout = eviction_timeout_s
async def request_lease(
self,
node_id: str,
gpu_id: int,
mb: int,
service: str,
priority: int,
agent_url: str,
ttl_s: float = 0.0,
) -> VRAMLease | None:
# Fast path: enough free VRAM
lease = await self.lease_manager.try_grant(
node_id, gpu_id, mb, service, priority, ttl_s
)
if lease is not None:
return lease
# Find eviction candidates
candidates = self.lease_manager.get_eviction_candidates(
node_id=node_id, gpu_id=gpu_id,
needed_mb=mb, requester_priority=priority,
)
if not candidates:
logger.info(
"No eviction candidates for %s on %s:GPU%d (%dMB needed)",
service, node_id, gpu_id, mb,
)
return None
# Evict candidates
freed_mb = sum(c.mb_granted for c in candidates)
logger.info(
"Evicting %d lease(s) to free %dMB for %s",
len(candidates), freed_mb, service,
)
for candidate in candidates:
await self._evict_lease(candidate, agent_url)
# Wait for evictions to free up VRAM (poll with timeout)
loop = asyncio.get_running_loop()
deadline = loop.time() + self._timeout
while loop.time() < deadline:
lease = await self.lease_manager.try_grant(
node_id, gpu_id, mb, service, priority, ttl_s
)
if lease is not None:
return lease
await asyncio.sleep(0.1)
logger.warning("Eviction timed out for %s after %.1fs", service, self._timeout)
return None
async def _evict_lease(self, lease: VRAMLease, agent_url: str) -> None:
"""Release lease accounting. Process-level eviction deferred to Plan B."""
await self.lease_manager.release(lease.lease_id)
async def _call_agent_evict(self, agent_url: str, lease: VRAMLease) -> bool:
"""POST /evict to the agent. Stub for v1 — real process lookup in Plan B."""
return True

View file

@ -0,0 +1,88 @@
from __future__ import annotations
import asyncio
from collections import defaultdict
from circuitforge_core.resources.models import VRAMLease
class LeaseManager:
def __init__(self) -> None:
self._leases: dict[str, VRAMLease] = {}
self._gpu_total: dict[tuple[str, int], int] = {}
self._gpu_used: dict[tuple[str, int], int] = defaultdict(int)
self._lock = asyncio.Lock()
def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None:
self._gpu_total[(node_id, gpu_id)] = total_mb
def gpu_total_mb(self, node_id: str, gpu_id: int) -> int:
return self._gpu_total.get((node_id, gpu_id), 0)
def used_mb(self, node_id: str, gpu_id: int) -> int:
return self._gpu_used[(node_id, gpu_id)]
async def try_grant(
self,
node_id: str,
gpu_id: int,
mb: int,
service: str,
priority: int,
ttl_s: float = 0.0,
) -> VRAMLease | None:
async with self._lock:
total = self._gpu_total.get((node_id, gpu_id), 0)
used = self._gpu_used[(node_id, gpu_id)]
if total - used < mb:
return None
lease = VRAMLease.create(
gpu_id=gpu_id, node_id=node_id, mb=mb,
service=service, priority=priority, ttl_s=ttl_s,
)
self._leases[lease.lease_id] = lease
self._gpu_used[(node_id, gpu_id)] += mb
return lease
async def release(self, lease_id: str) -> bool:
async with self._lock:
lease = self._leases.pop(lease_id, None)
if lease is None:
return False
self._gpu_used[(lease.node_id, lease.gpu_id)] -= lease.mb_granted
return True
def get_eviction_candidates(
self,
node_id: str,
gpu_id: int,
needed_mb: int,
requester_priority: int,
) -> list[VRAMLease]:
candidates = [
lease for lease in self._leases.values()
if lease.node_id == node_id
and lease.gpu_id == gpu_id
and lease.priority > requester_priority
]
candidates.sort(key=lambda lease: lease.priority, reverse=True)
selected: list[VRAMLease] = []
freed = 0
for candidate in candidates:
selected.append(candidate)
freed += candidate.mb_granted
if freed >= needed_mb:
break
return selected
def list_leases(
self, node_id: str | None = None, gpu_id: int | None = None
) -> list[VRAMLease]:
return [
lease for lease in self._leases.values()
if (node_id is None or lease.node_id == node_id)
and (gpu_id is None or lease.gpu_id == gpu_id)
]
def all_leases(self) -> list[VRAMLease]:
return list(self._leases.values())

View file

@ -0,0 +1,65 @@
# circuitforge_core/resources/coordinator/profile_registry.py
from __future__ import annotations
import logging
from pathlib import Path
from circuitforge_core.resources.models import GpuInfo
from circuitforge_core.resources.profiles.schema import GpuProfile, load_profile
_PUBLIC_DIR = Path(__file__).parent.parent / "profiles" / "public"
# VRAM thresholds for public profile selection (MB)
_PROFILE_THRESHOLDS = [
(22000, "single-gpu-24gb"),
(14000, "single-gpu-16gb"),
(8000, "single-gpu-8gb"),
(5500, "single-gpu-6gb"),
(3500, "single-gpu-4gb"),
(0, "single-gpu-2gb"),
]
_log = logging.getLogger(__name__)
class ProfileRegistry:
def __init__(self, extra_dirs: list[Path] | None = None) -> None:
self._profiles: dict[str, GpuProfile] = {}
self._load_dir(_PUBLIC_DIR)
for d in (extra_dirs or []):
if d.exists():
self._load_dir(d)
def _load_dir(self, directory: Path) -> None:
for yaml_file in directory.glob("*.yaml"):
try:
profile = load_profile(yaml_file)
self._profiles[profile.name] = profile
except Exception as exc:
_log.warning("Skipping %s: %s", yaml_file, exc)
def load(self, path: Path) -> GpuProfile:
profile = load_profile(path)
self._profiles[profile.name] = profile
return profile
def list_public(self) -> list[GpuProfile]:
# CPU profiles (cpu-*) are intentionally excluded — this endpoint
# is used to match GPU hardware. CPU inference nodes self-select
# their profile via the CLI and are not listed for lease matching.
return [
p for p in self._profiles.values()
if p.name.startswith("single-gpu-")
]
def get(self, name: str) -> GpuProfile | None:
return self._profiles.get(name)
def auto_detect(self, gpus: list[GpuInfo]) -> GpuProfile:
primary_vram = gpus[0].vram_total_mb if gpus else 0
for threshold_mb, profile_name in _PROFILE_THRESHOLDS:
if primary_vram >= threshold_mb:
profile = self._profiles.get(profile_name)
if profile:
return profile
return self._profiles["single-gpu-2gb"]

View file

@ -0,0 +1,56 @@
from __future__ import annotations
import time
import uuid
from dataclasses import dataclass, field
@dataclass(frozen=True)
class VRAMLease:
lease_id: str
gpu_id: int
node_id: str
mb_granted: int
holder_service: str
priority: int
expires_at: float # unix timestamp; 0.0 = no expiry
@classmethod
def create(
cls,
gpu_id: int,
node_id: str,
mb: int,
service: str,
priority: int,
ttl_s: float = 0.0,
) -> VRAMLease:
return cls(
lease_id=str(uuid.uuid4()),
gpu_id=gpu_id,
node_id=node_id,
mb_granted=mb,
holder_service=service,
priority=priority,
expires_at=time.time() + ttl_s if ttl_s > 0.0 else 0.0,
)
def is_expired(self) -> bool:
return self.expires_at > 0.0 and time.time() > self.expires_at
@dataclass(frozen=True)
class GpuInfo:
gpu_id: int
name: str
vram_total_mb: int
vram_used_mb: int
vram_free_mb: int
@dataclass
class NodeInfo:
node_id: str
agent_url: str
gpus: list[GpuInfo]
last_heartbeat: float = field(default_factory=time.time)

View file

@ -0,0 +1,33 @@
schema_version: 1
name: cpu-16gb
eviction_timeout_s: 30.0
services:
ollama:
max_mb: 0
priority: 1
cf-stt:
max_mb: 0
priority: 2
shared: true
max_concurrent: 1
backend: moonshine
cf-tts:
max_mb: 0
priority: 2
shared: true
max_concurrent: 1
cf-embed:
max_mb: 0
priority: 2
shared: true
max_concurrent: 2
always_on: true
cf-classify:
max_mb: 0
priority: 2
shared: true
max_concurrent: 2
always_on: true
model_size_hints:
llm_max_params: 3b-q4
image_gen_max: none

View file

@ -0,0 +1,33 @@
schema_version: 1
name: cpu-32gb
eviction_timeout_s: 30.0
services:
ollama:
max_mb: 0
priority: 1
cf-stt:
max_mb: 0
priority: 2
shared: true
max_concurrent: 2
backend: faster-whisper
cf-tts:
max_mb: 0
priority: 2
shared: true
max_concurrent: 2
cf-embed:
max_mb: 0
priority: 2
shared: true
max_concurrent: 4
always_on: true
cf-classify:
max_mb: 0
priority: 2
shared: true
max_concurrent: 4
always_on: true
model_size_hints:
llm_max_params: 7b-q4
image_gen_max: none

View file

@ -0,0 +1,45 @@
schema_version: 1
name: single-gpu-16gb
vram_total_mb: 16384
eviction_timeout_s: 10.0
services:
vllm:
max_mb: 12288
priority: 1
ollama:
max_mb: 12288
priority: 1
cf-vision:
max_mb: 3072
priority: 2
shared: true
max_concurrent: 4
cf-stt:
max_mb: 1200
priority: 2
shared: true
max_concurrent: 3
backend: parakeet-tdt
cf-tts:
max_mb: 1024
priority: 2
shared: true
max_concurrent: 3
cf-embed:
max_mb: 512
priority: 2
shared: true
max_concurrent: 6
always_on: true
cf-classify:
max_mb: 512
priority: 2
shared: true
max_concurrent: 6
always_on: true
comfyui:
max_mb: 14336
priority: 4
model_size_hints:
llm_max_params: 34b
image_gen_max: flux-dev-fp8

View file

@ -0,0 +1,45 @@
schema_version: 1
name: single-gpu-24gb
vram_total_mb: 24576
eviction_timeout_s: 10.0
services:
vllm:
max_mb: 20480
priority: 1
ollama:
max_mb: 18432
priority: 1
cf-vision:
max_mb: 4096
priority: 2
shared: true
max_concurrent: 6
cf-stt:
max_mb: 1200
priority: 2
shared: true
max_concurrent: 4
backend: parakeet-tdt
cf-tts:
max_mb: 1024
priority: 2
shared: true
max_concurrent: 4
cf-embed:
max_mb: 512
priority: 2
shared: true
max_concurrent: 8
always_on: true
cf-classify:
max_mb: 512
priority: 2
shared: true
max_concurrent: 8
always_on: true
comfyui:
max_mb: 20480
priority: 4
model_size_hints:
llm_max_params: 70b
image_gen_max: flux-dev-fp16

View file

@ -0,0 +1,22 @@
schema_version: 1
name: single-gpu-2gb
vram_total_mb: 2048
eviction_timeout_s: 15.0
services:
ollama:
max_mb: 1536
priority: 1
cf-vision:
max_mb: 512
priority: 2
shared: true
max_concurrent: 1
cf-stt:
max_mb: 200
priority: 2
shared: true
max_concurrent: 1
backend: moonshine
model_size_hints:
llm_max_params: 3b
image_gen_max: none

View file

@ -0,0 +1,30 @@
schema_version: 1
name: single-gpu-4gb
vram_total_mb: 4096
eviction_timeout_s: 15.0
services:
ollama:
max_mb: 3072
priority: 1
cf-vision:
max_mb: 1024
priority: 2
shared: true
max_concurrent: 1
cf-stt:
max_mb: 600
priority: 2
shared: true
max_concurrent: 1
backend: faster-whisper
cf-tts:
max_mb: 512
priority: 2
shared: true
max_concurrent: 1
comfyui:
max_mb: 3584
priority: 4
model_size_hints:
llm_max_params: 3b
image_gen_max: sd15-fp8

View file

@ -0,0 +1,33 @@
schema_version: 1
name: single-gpu-6gb
vram_total_mb: 6144
eviction_timeout_s: 10.0
services:
vllm:
max_mb: 4096
priority: 1
ollama:
max_mb: 3584
priority: 1
cf-vision:
max_mb: 1536
priority: 2
shared: true
max_concurrent: 2
cf-stt:
max_mb: 600
priority: 2
shared: true
max_concurrent: 2
backend: faster-whisper
cf-tts:
max_mb: 768
priority: 2
shared: true
max_concurrent: 1
comfyui:
max_mb: 5120
priority: 4
model_size_hints:
llm_max_params: 7b
image_gen_max: sd15

View file

@ -0,0 +1,33 @@
schema_version: 1
name: single-gpu-8gb
vram_total_mb: 8192
eviction_timeout_s: 10.0
services:
vllm:
max_mb: 5120
priority: 1
ollama:
max_mb: 4096
priority: 1
cf-vision:
max_mb: 2048
priority: 2
shared: true
max_concurrent: 3
cf-stt:
max_mb: 1200
priority: 2
shared: true
max_concurrent: 2
backend: parakeet-tdt
cf-tts:
max_mb: 1024
priority: 2
shared: true
max_concurrent: 2
comfyui:
max_mb: 6144
priority: 4
model_size_hints:
llm_max_params: 8b
image_gen_max: sdxl-fp8

View file

@ -0,0 +1,66 @@
# circuitforge_core/resources/profiles/schema.py
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from pydantic import BaseModel, Field
SUPPORTED_SCHEMA_VERSION = 1
class ServiceProfile(BaseModel):
max_mb: int
priority: int
shared: bool = False
max_concurrent: int = 1
always_on: bool = False
backend: str | None = None
consumers: list[str] = Field(default_factory=list)
model_config = {"frozen": True}
class GpuNodeEntry(BaseModel):
id: int
vram_mb: int
role: str
card: str = "unknown"
always_on: bool = False
services: list[str] = Field(default_factory=list)
model_config = {"frozen": True}
class NodeProfile(BaseModel):
gpus: list[GpuNodeEntry]
agent_url: str | None = None
nas_mount: str | None = None
model_config = {"frozen": True}
class GpuProfile(BaseModel):
schema_version: int
name: str
vram_total_mb: int | None = None
eviction_timeout_s: float = 10.0
services: dict[str, ServiceProfile] = Field(default_factory=dict)
model_size_hints: dict[str, str] = Field(default_factory=dict)
nodes: dict[str, NodeProfile] = Field(default_factory=dict)
model_config = {"frozen": True}
def load_profile(path: Path) -> GpuProfile:
raw: dict[str, Any] = yaml.safe_load(path.read_text())
if not isinstance(raw, dict):
raise ValueError(f"Profile file {path} must be a YAML mapping, got {type(raw).__name__}")
version = raw.get("schema_version")
if version != SUPPORTED_SCHEMA_VERSION:
raise ValueError(
f"Unsupported schema_version {version!r} in {path}. "
f"Expected {SUPPORTED_SCHEMA_VERSION}."
)
return GpuProfile.model_validate(raw)

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "circuitforge-core"
version = "0.1.0"
version = "0.2.0"
description = "Shared scaffold for CircuitForge products"
requires-python = ">=3.11"
dependencies = [
@ -13,9 +13,29 @@ dependencies = [
"openai>=1.0",
]
[project.optional-dependencies]
orch = [
"fastapi>=0.110",
"uvicorn[standard]>=0.29",
"httpx>=0.27",
"pydantic>=2.0",
"typer[all]>=0.12",
"psutil>=5.9",
]
dev = [
"circuitforge-core[orch]",
"pytest>=8.0",
"pytest-asyncio>=0.23",
"httpx>=0.27",
]
[project.scripts]
cf-orch = "circuitforge_core.resources.cli:app"
[tool.setuptools.packages.find]
where = ["."]
include = ["circuitforge_core*"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

View file

View file

@ -0,0 +1,68 @@
from __future__ import annotations
import pytest
from unittest.mock import MagicMock
from fastapi.testclient import TestClient
from circuitforge_core.resources.agent.app import create_agent_app
from circuitforge_core.resources.models import GpuInfo
from circuitforge_core.resources.agent.eviction_executor import EvictionResult
MOCK_GPUS = [
GpuInfo(
gpu_id=0,
name="RTX 4000",
vram_total_mb=8192,
vram_used_mb=1024,
vram_free_mb=7168,
),
]
@pytest.fixture
def agent_client():
mock_monitor = MagicMock()
mock_monitor.poll.return_value = MOCK_GPUS
mock_executor = MagicMock()
app = create_agent_app(
node_id="heimdall",
monitor=mock_monitor,
executor=mock_executor,
)
return TestClient(app), mock_monitor, mock_executor
def test_health_returns_ok(agent_client):
client, _, _ = agent_client
resp = client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
assert resp.json()["node_id"] == "heimdall"
def test_gpu_info_returns_gpu_list(agent_client):
client, _, _ = agent_client
resp = client.get("/gpu-info")
assert resp.status_code == 200
data = resp.json()
assert len(data["gpus"]) == 1
assert data["gpus"][0]["gpu_id"] == 0
assert data["gpus"][0]["name"] == "RTX 4000"
assert data["gpus"][0]["vram_free_mb"] == 7168
def test_evict_calls_executor(agent_client):
client, _, mock_executor = agent_client
mock_executor.evict_pid.return_value = EvictionResult(
success=True, method="sigterm", message="done"
)
resp = client.post("/evict", json={"pid": 1234, "grace_period_s": 5.0})
assert resp.status_code == 200
assert resp.json()["success"] is True
mock_executor.evict_pid.assert_called_once_with(pid=1234, grace_period_s=5.0)
def test_evict_requires_pid(agent_client):
client, _, _ = agent_client
resp = client.post("/evict", json={"grace_period_s": 5.0})
assert resp.status_code == 422

View file

@ -0,0 +1,33 @@
from __future__ import annotations
from pathlib import Path
from unittest.mock import patch
from typer.testing import CliRunner
from circuitforge_core.resources.cli import app
runner = CliRunner()
def test_cli_help():
result = runner.invoke(app, ["--help"])
assert result.exit_code == 0
assert "cf-orch" in result.output.lower() or "Usage" in result.output
def test_status_command_shows_no_coordinator_message():
with patch("httpx.get", side_effect=ConnectionRefusedError("refused")):
result = runner.invoke(app, ["status"])
assert result.exit_code != 0 or "unreachable" in result.output.lower() \
or "coordinator" in result.output.lower()
def test_install_service_creates_systemd_unit(tmp_path: Path):
unit_path = tmp_path / "cf-orch.service"
with patch(
"circuitforge_core.resources.cli._SYSTEMD_UNIT_PATH", unit_path
):
result = runner.invoke(app, ["install-service", "--dry-run"])
assert result.exit_code == 0
assert "cf-orch.service" in result.output or "systemd" in result.output.lower()

View file

@ -0,0 +1,102 @@
import pytest
from unittest.mock import MagicMock
from fastapi.testclient import TestClient
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo
@pytest.fixture
def coordinator_client():
lease_manager = LeaseManager()
lease_manager.register_gpu("heimdall", 0, 8192)
profile_registry = ProfileRegistry()
supervisor = MagicMock()
supervisor.all_nodes.return_value = [
NodeInfo(
node_id="heimdall",
agent_url="http://localhost:7701",
gpus=[GpuInfo(gpu_id=0, name="RTX 4000",
vram_total_mb=8192, vram_used_mb=0, vram_free_mb=8192)],
last_heartbeat=0.0,
)
]
supervisor.get_node_info.return_value = NodeInfo(
node_id="heimdall",
agent_url="http://localhost:7701",
gpus=[],
last_heartbeat=0.0,
)
app = create_coordinator_app(
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=supervisor,
)
return TestClient(app), lease_manager
def test_health_returns_ok(coordinator_client):
client, _ = coordinator_client
resp = client.get("/api/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
def test_get_nodes_returns_list(coordinator_client):
client, _ = coordinator_client
resp = client.get("/api/nodes")
assert resp.status_code == 200
nodes = resp.json()["nodes"]
assert len(nodes) == 1
assert nodes[0]["node_id"] == "heimdall"
def test_get_profiles_returns_public_profiles(coordinator_client):
client, _ = coordinator_client
resp = client.get("/api/profiles")
assert resp.status_code == 200
names = [p["name"] for p in resp.json()["profiles"]]
assert "single-gpu-8gb" in names
def test_post_lease_grants_lease(coordinator_client):
client, _ = coordinator_client
resp = client.post("/api/leases", json={
"node_id": "heimdall", "gpu_id": 0,
"mb": 2048, "service": "peregrine", "priority": 1,
})
assert resp.status_code == 200
data = resp.json()
assert data["lease"]["mb_granted"] == 2048
assert data["lease"]["holder_service"] == "peregrine"
assert "lease_id" in data["lease"]
def test_delete_lease_releases_it(coordinator_client):
client, _ = coordinator_client
resp = client.post("/api/leases", json={
"node_id": "heimdall", "gpu_id": 0,
"mb": 2048, "service": "peregrine", "priority": 1,
})
lease_id = resp.json()["lease"]["lease_id"]
del_resp = client.delete(f"/api/leases/{lease_id}")
assert del_resp.status_code == 200
assert del_resp.json()["released"] is True
def test_delete_unknown_lease_returns_404(coordinator_client):
client, _ = coordinator_client
resp = client.delete("/api/leases/nonexistent-id")
assert resp.status_code == 404
def test_get_leases_returns_active_leases(coordinator_client):
client, _ = coordinator_client
client.post("/api/leases", json={
"node_id": "heimdall", "gpu_id": 0,
"mb": 1024, "service": "kiwi", "priority": 2,
})
resp = client.get("/api/leases")
assert resp.status_code == 200
assert len(resp.json()["leases"]) == 1

View file

@ -0,0 +1,67 @@
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
@pytest.fixture
def lease_manager():
mgr = LeaseManager()
mgr.register_gpu("heimdall", 0, 8192)
return mgr
@pytest.fixture
def engine(lease_manager):
return EvictionEngine(lease_manager=lease_manager, eviction_timeout_s=0.1)
@pytest.mark.asyncio
async def test_request_lease_grants_when_vram_available(engine, lease_manager):
lease = await engine.request_lease(
node_id="heimdall", gpu_id=0, mb=4096,
service="peregrine", priority=1,
agent_url="http://localhost:7701",
)
assert lease is not None
assert lease.mb_granted == 4096
@pytest.mark.asyncio
async def test_request_lease_evicts_and_grants(engine, lease_manager):
# Pre-fill with a low-priority lease
big_lease = await lease_manager.try_grant(
"heimdall", 0, 7000, "comfyui", priority=4
)
assert big_lease is not None
# Mock the agent eviction call
with patch(
"circuitforge_core.resources.coordinator.eviction_engine.EvictionEngine._call_agent_evict",
new_callable=AsyncMock,
) as mock_evict:
mock_evict.return_value = True
# Simulate the comfyui lease being released (as if the agent evicted it)
asyncio.get_event_loop().call_later(
0.05, lambda: asyncio.ensure_future(lease_manager.release(big_lease.lease_id))
)
lease = await engine.request_lease(
node_id="heimdall", gpu_id=0, mb=4096,
service="peregrine", priority=1,
agent_url="http://localhost:7701",
)
assert lease is not None
assert lease.holder_service == "peregrine"
@pytest.mark.asyncio
async def test_request_lease_returns_none_when_no_eviction_candidates(engine):
await engine.lease_manager.try_grant("heimdall", 0, 6000, "vllm", priority=1)
# Requesting 4GB but no lower-priority leases exist
lease = await engine.request_lease(
node_id="heimdall", gpu_id=0, mb=4096,
service="kiwi", priority=2,
agent_url="http://localhost:7701",
)
assert lease is None

View file

@ -0,0 +1,43 @@
import signal
from unittest.mock import patch, call
import pytest
from circuitforge_core.resources.agent.eviction_executor import EvictionExecutor, EvictionResult
def test_evict_by_pid_sends_sigterm_then_sigkill():
executor = EvictionExecutor(grace_period_s=0.01)
# pid_exists always True → grace period expires → SIGKILL fires
with patch("os.kill") as mock_kill, \
patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil:
mock_psutil.pid_exists.return_value = True
result = executor.evict_pid(pid=1234, grace_period_s=0.01)
assert result.success is True
calls = mock_kill.call_args_list
assert call(1234, signal.SIGTERM) in calls
assert call(1234, signal.SIGKILL) in calls
def test_evict_pid_succeeds_on_sigterm_alone():
executor = EvictionExecutor(grace_period_s=0.1)
with patch("os.kill"), \
patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil:
mock_psutil.pid_exists.side_effect = [True, False] # gone after SIGTERM
result = executor.evict_pid(pid=5678, grace_period_s=0.01)
assert result.success is True
assert result.method == "sigterm"
def test_evict_pid_not_found_returns_failure():
executor = EvictionExecutor()
with patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil:
mock_psutil.pid_exists.return_value = False
result = executor.evict_pid(pid=9999)
assert result.success is False
assert "not found" in result.message.lower()
def test_eviction_result_is_immutable():
result = EvictionResult(success=True, method="sigterm", message="ok")
with pytest.raises((AttributeError, TypeError)):
result.success = False # type: ignore

View file

@ -0,0 +1,60 @@
from unittest.mock import patch
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
SAMPLE_NVIDIA_SMI_OUTPUT = (
"0, Quadro RTX 4000, 8192, 6843, 1349\n"
"1, Quadro RTX 4000, 8192, 721, 7471\n"
)
def test_parse_returns_list_of_gpu_info():
monitor = GpuMonitor()
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run:
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = SAMPLE_NVIDIA_SMI_OUTPUT
gpus = monitor.poll()
assert len(gpus) == 2
assert gpus[0].gpu_id == 0
assert gpus[0].name == "Quadro RTX 4000"
assert gpus[0].vram_total_mb == 8192
assert gpus[0].vram_used_mb == 6843
assert gpus[0].vram_free_mb == 1349
def test_parse_second_gpu():
monitor = GpuMonitor()
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run:
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = SAMPLE_NVIDIA_SMI_OUTPUT
gpus = monitor.poll()
assert gpus[1].gpu_id == 1
assert gpus[1].vram_used_mb == 721
assert gpus[1].vram_free_mb == 7471
def test_poll_returns_empty_list_when_nvidia_smi_unavailable():
monitor = GpuMonitor()
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run", side_effect=FileNotFoundError):
gpus = monitor.poll()
assert gpus == []
def test_poll_returns_empty_list_on_nonzero_exit():
monitor = GpuMonitor()
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run:
mock_run.return_value.returncode = 1
mock_run.return_value.stdout = ""
gpus = monitor.poll()
assert gpus == []
def test_poll_skips_malformed_lines():
monitor = GpuMonitor()
malformed = "0, RTX 4000, 8192, not_a_number, 1024\n1, RTX 4000, 8192, 512, 7680\n"
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run:
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = malformed
gpus = monitor.poll()
assert len(gpus) == 1
assert gpus[0].gpu_id == 1

View file

@ -0,0 +1,219 @@
"""Integration test: full lease → eviction → re-grant cycle.
Runs coordinator in-process (no subprocesses, no real nvidia-smi).
Uses TestClient for HTTP, mocks AgentSupervisor to return fixed node state.
"""
import pytest
from unittest.mock import MagicMock
from fastapi.testclient import TestClient
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.app import create_coordinator_app
from circuitforge_core.resources.models import GpuInfo, NodeInfo
@pytest.fixture
def system():
"""Create an in-process coordinator system with 8GB GPU and mock supervisor."""
lease_manager = LeaseManager()
lease_manager.register_gpu("local", 0, 8192)
mock_supervisor = MagicMock(spec=AgentSupervisor)
mock_supervisor.all_nodes.return_value = [
NodeInfo(
node_id="local",
agent_url="http://localhost:7701",
gpus=[GpuInfo(
gpu_id=0,
name="RTX 4000",
vram_total_mb=8192,
vram_used_mb=0,
vram_free_mb=8192,
)],
last_heartbeat=0.0,
)
]
mock_supervisor.get_node_info.return_value = NodeInfo(
node_id="local",
agent_url="http://localhost:7701",
gpus=[],
last_heartbeat=0.0,
)
profile_registry = ProfileRegistry()
app = create_coordinator_app(
lease_manager=lease_manager,
profile_registry=profile_registry,
agent_supervisor=mock_supervisor,
)
client = TestClient(app)
return client, lease_manager
def test_full_lease_cycle(system):
"""Test: grant, verify, release, verify gone."""
client, _ = system
# Grant a lease
resp = client.post("/api/leases", json={
"node_id": "local",
"gpu_id": 0,
"mb": 4096,
"service": "peregrine",
"priority": 1,
})
assert resp.status_code == 200
lease_data = resp.json()["lease"]
lease_id = lease_data["lease_id"]
assert lease_data["mb_granted"] == 4096
assert lease_data["holder_service"] == "peregrine"
# Verify it appears in active leases
resp = client.get("/api/leases")
assert resp.status_code == 200
leases = resp.json()["leases"]
assert any(l["lease_id"] == lease_id for l in leases)
# Release it
resp = client.delete(f"/api/leases/{lease_id}")
assert resp.status_code == 200
assert resp.json()["released"] is True
# Verify it's gone
resp = client.get("/api/leases")
assert resp.status_code == 200
leases = resp.json()["leases"]
assert not any(l["lease_id"] == lease_id for l in leases)
def test_vram_exhaustion_returns_503(system):
"""Test: fill GPU, then request with no eviction candidates returns 503."""
client, _ = system
# Fill GPU 0 with high-priority lease
resp = client.post("/api/leases", json={
"node_id": "local",
"gpu_id": 0,
"mb": 8000,
"service": "vllm",
"priority": 1,
})
assert resp.status_code == 200
# Try to get more VRAM with same priority (no eviction candidates)
resp = client.post("/api/leases", json={
"node_id": "local",
"gpu_id": 0,
"mb": 2000,
"service": "kiwi",
"priority": 1,
})
assert resp.status_code == 503
assert "Insufficient VRAM" in resp.json()["detail"]
def test_auto_detect_profile_for_8gb():
"""Test: ProfileRegistry auto-detects single-gpu-8gb for 8GB GPU."""
registry = ProfileRegistry()
gpu = GpuInfo(
gpu_id=0,
name="RTX 4000",
vram_total_mb=8192,
vram_used_mb=0,
vram_free_mb=8192,
)
profile = registry.auto_detect([gpu])
assert profile.name == "single-gpu-8gb"
# Verify profile has services configured
assert hasattr(profile, "services")
def test_node_endpoint_shows_nodes(system):
"""Test: GET /api/nodes returns the mocked node."""
client, _ = system
resp = client.get("/api/nodes")
assert resp.status_code == 200
nodes = resp.json()["nodes"]
assert len(nodes) == 1
assert nodes[0]["node_id"] == "local"
assert nodes[0]["agent_url"] == "http://localhost:7701"
assert len(nodes[0]["gpus"]) == 1
assert nodes[0]["gpus"][0]["name"] == "RTX 4000"
def test_profiles_endpoint_returns_public_profiles(system):
"""Test: GET /api/profiles returns standard public profiles."""
client, _ = system
resp = client.get("/api/profiles")
assert resp.status_code == 200
profiles = resp.json()["profiles"]
names = [p["name"] for p in profiles]
# Verify common public profiles are present
assert "single-gpu-8gb" in names
assert "single-gpu-6gb" in names
assert "single-gpu-2gb" in names
def test_multiple_leases_tracked_independently(system):
"""Test: multiple active leases are tracked correctly."""
client, _ = system
# Grant lease 1
resp1 = client.post("/api/leases", json={
"node_id": "local",
"gpu_id": 0,
"mb": 2048,
"service": "peregrine",
"priority": 2,
})
assert resp1.status_code == 200
lease1_id = resp1.json()["lease"]["lease_id"]
# Grant lease 2
resp2 = client.post("/api/leases", json={
"node_id": "local",
"gpu_id": 0,
"mb": 2048,
"service": "kiwi",
"priority": 2,
})
assert resp2.status_code == 200
lease2_id = resp2.json()["lease"]["lease_id"]
# Both should be in active leases
resp = client.get("/api/leases")
leases = resp.json()["leases"]
lease_ids = [l["lease_id"] for l in leases]
assert lease1_id in lease_ids
assert lease2_id in lease_ids
assert len(leases) == 2
# Release lease 1
resp = client.delete(f"/api/leases/{lease1_id}")
assert resp.status_code == 200
# Only lease 2 should remain
resp = client.get("/api/leases")
leases = resp.json()["leases"]
lease_ids = [l["lease_id"] for l in leases]
assert lease1_id not in lease_ids
assert lease2_id in lease_ids
assert len(leases) == 1
def test_delete_nonexistent_lease_returns_404(system):
"""Test: deleting a nonexistent lease returns 404."""
client, _ = system
resp = client.delete("/api/leases/nonexistent-lease-id")
assert resp.status_code == 404
assert "not found" in resp.json()["detail"]
def test_health_endpoint_returns_ok(system):
"""Test: GET /api/health returns status ok."""
client, _ = system
resp = client.get("/api/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"

View file

@ -0,0 +1,85 @@
import pytest
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
@pytest.fixture
def mgr():
m = LeaseManager()
m.register_gpu(node_id="heimdall", gpu_id=0, total_mb=8192)
return m
@pytest.mark.asyncio
async def test_grant_succeeds_when_vram_available(mgr):
lease = await mgr.try_grant(
node_id="heimdall", gpu_id=0, mb=4096,
service="peregrine", priority=1
)
assert lease is not None
assert lease.mb_granted == 4096
assert lease.node_id == "heimdall"
assert lease.gpu_id == 0
@pytest.mark.asyncio
async def test_grant_fails_when_vram_insufficient(mgr):
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
service="vllm", priority=1)
lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000,
service="kiwi", priority=2)
assert lease is None
@pytest.mark.asyncio
async def test_release_frees_vram(mgr):
lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
service="vllm", priority=1)
assert lease is not None
released = await mgr.release(lease.lease_id)
assert released is True
lease2 = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
service="comfyui", priority=4)
assert lease2 is not None
@pytest.mark.asyncio
async def test_release_unknown_lease_returns_false(mgr):
result = await mgr.release("nonexistent-id")
assert result is False
@pytest.mark.asyncio
async def test_get_eviction_candidates_returns_lower_priority_leases(mgr):
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=3000,
service="comfyui", priority=4)
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000,
service="ollama", priority=1)
candidates = mgr.get_eviction_candidates(
node_id="heimdall", gpu_id=0,
needed_mb=3000, requester_priority=2
)
assert len(candidates) == 1
assert candidates[0].holder_service == "comfyui"
@pytest.mark.asyncio
async def test_list_leases_for_gpu(mgr):
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=1024,
service="peregrine", priority=1)
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=512,
service="kiwi", priority=2)
leases = mgr.list_leases(node_id="heimdall", gpu_id=0)
assert len(leases) == 2
def test_register_gpu_sets_total(mgr):
assert mgr.gpu_total_mb("heimdall", 0) == 8192
@pytest.mark.asyncio
async def test_used_mb_tracks_grants():
mgr = LeaseManager()
mgr.register_gpu("heimdall", 0, 8192)
await mgr.try_grant("heimdall", 0, 3000, "a", 1)
await mgr.try_grant("heimdall", 0, 2000, "b", 2)
assert mgr.used_mb("heimdall", 0) == 5000

View file

@ -0,0 +1,47 @@
import time
import pytest
from circuitforge_core.resources.models import VRAMLease, GpuInfo, NodeInfo
def test_vram_lease_create_assigns_unique_ids():
lease_a = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=4096,
service="peregrine", priority=1)
lease_b = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=4096,
service="peregrine", priority=1)
assert lease_a.lease_id != lease_b.lease_id
def test_vram_lease_create_with_ttl_sets_expiry():
before = time.time()
lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=2048,
service="kiwi", priority=2, ttl_s=60.0)
after = time.time()
assert before + 60.0 <= lease.expires_at <= after + 60.0
def test_vram_lease_create_no_ttl_has_zero_expiry():
lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=1024,
service="snipe", priority=2)
assert lease.expires_at == 0.0
def test_vram_lease_is_immutable():
lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=1024,
service="snipe", priority=2)
with pytest.raises((AttributeError, TypeError)):
lease.mb_granted = 999 # type: ignore
def test_gpu_info_fields():
info = GpuInfo(gpu_id=0, name="RTX 4000", vram_total_mb=8192,
vram_used_mb=2048, vram_free_mb=6144)
assert info.vram_free_mb == 6144
def test_node_info_fields():
gpu = GpuInfo(gpu_id=0, name="RTX 4000", vram_total_mb=8192,
vram_used_mb=0, vram_free_mb=8192)
node = NodeInfo(node_id="heimdall", agent_url="http://localhost:7701",
gpus=[gpu], last_heartbeat=time.time())
assert node.node_id == "heimdall"
assert len(node.gpus) == 1

View file

@ -0,0 +1,101 @@
# tests/test_resources/test_profile_registry.py
import pytest
from unittest.mock import MagicMock
from circuitforge_core.resources.profiles.schema import (
GpuProfile, ServiceProfile, load_profile
)
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
def test_load_8gb_profile(tmp_path):
yaml_content = """
schema_version: 1
name: single-gpu-8gb
vram_total_mb: 8192
eviction_timeout_s: 10.0
services:
vllm:
max_mb: 5120
priority: 1
cf-vision:
max_mb: 2048
priority: 2
shared: true
max_concurrent: 3
"""
profile_file = tmp_path / "test.yaml"
profile_file.write_text(yaml_content)
profile = load_profile(profile_file)
assert profile.name == "single-gpu-8gb"
assert profile.schema_version == 1
assert profile.vram_total_mb == 8192
assert profile.eviction_timeout_s == 10.0
assert "vllm" in profile.services
assert profile.services["vllm"].max_mb == 5120
assert profile.services["vllm"].priority == 1
assert profile.services["cf-vision"].shared is True
assert profile.services["cf-vision"].max_concurrent == 3
def test_load_profile_rejects_wrong_schema_version(tmp_path):
yaml_content = "schema_version: 99\nname: future\n"
profile_file = tmp_path / "future.yaml"
profile_file.write_text(yaml_content)
with pytest.raises(ValueError, match="schema_version"):
load_profile(profile_file)
def test_service_profile_defaults():
svc = ServiceProfile(max_mb=1024, priority=2)
assert svc.shared is False
assert svc.max_concurrent == 1
assert svc.always_on is False
assert svc.backend is None
assert svc.consumers == []
def test_profile_registry_loads_public_profiles():
registry = ProfileRegistry()
profiles = registry.list_public()
names = [p.name for p in profiles]
assert "single-gpu-8gb" in names
assert "single-gpu-6gb" in names
assert "single-gpu-2gb" in names
def test_profile_registry_auto_detect_selects_8gb():
registry = ProfileRegistry()
mock_gpus = [
MagicMock(vram_total_mb=8192),
]
profile = registry.auto_detect(mock_gpus)
assert profile.name == "single-gpu-8gb"
def test_profile_registry_auto_detect_selects_6gb():
registry = ProfileRegistry()
mock_gpus = [MagicMock(vram_total_mb=6144)]
profile = registry.auto_detect(mock_gpus)
assert profile.name == "single-gpu-6gb"
def test_profile_registry_auto_detect_selects_2gb():
registry = ProfileRegistry()
mock_gpus = [MagicMock(vram_total_mb=2048)]
profile = registry.auto_detect(mock_gpus)
assert profile.name == "single-gpu-2gb"
def test_profile_registry_load_from_path(tmp_path):
yaml_content = (
"schema_version: 1\nname: custom\n"
"vram_total_mb: 12288\neviction_timeout_s: 5.0\n"
)
p = tmp_path / "custom.yaml"
p.write_text(yaml_content)
registry = ProfileRegistry()
profile = registry.load(p)
assert profile.name == "custom"
assert profile.vram_total_mb == 12288