Merge pull request 'feat(resources): cf-orch GPU VRAM orchestration — Plan A core' (#1) from feature/cforch-core-orchestration into main
This commit is contained in:
commit
99f4e95018
37 changed files with 2094 additions and 1 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
|
@ -4,3 +4,7 @@ __pycache__/
|
|||
*.egg-info/
|
||||
dist/
|
||||
.pytest_cache/
|
||||
.superpowers/
|
||||
|
||||
# cf-orch private profiles (commit on personal/heimdall branch only)
|
||||
circuitforge_core/resources/profiles/private/
|
||||
|
|
|
|||
0
circuitforge_core/resources/__init__.py
Normal file
0
circuitforge_core/resources/__init__.py
Normal file
0
circuitforge_core/resources/agent/__init__.py
Normal file
0
circuitforge_core/resources/agent/__init__.py
Normal file
60
circuitforge_core/resources/agent/app.py
Normal file
60
circuitforge_core/resources/agent/app.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from fastapi import FastAPI
|
||||
from pydantic import BaseModel
|
||||
|
||||
from circuitforge_core.resources.agent.eviction_executor import EvictionExecutor
|
||||
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EvictRequest(BaseModel):
|
||||
pid: int
|
||||
grace_period_s: float = 5.0
|
||||
|
||||
|
||||
def create_agent_app(
|
||||
node_id: str,
|
||||
monitor: GpuMonitor | None = None,
|
||||
executor: EvictionExecutor | None = None,
|
||||
) -> FastAPI:
|
||||
_monitor = monitor or GpuMonitor()
|
||||
_executor = executor or EvictionExecutor()
|
||||
|
||||
app = FastAPI(title=f"cf-orch-agent [{node_id}]")
|
||||
|
||||
@app.get("/health")
|
||||
def health() -> dict[str, Any]:
|
||||
return {"status": "ok", "node_id": node_id}
|
||||
|
||||
@app.get("/gpu-info")
|
||||
def gpu_info() -> dict[str, Any]:
|
||||
gpus = _monitor.poll()
|
||||
return {
|
||||
"node_id": node_id,
|
||||
"gpus": [
|
||||
{
|
||||
"gpu_id": g.gpu_id,
|
||||
"name": g.name,
|
||||
"vram_total_mb": g.vram_total_mb,
|
||||
"vram_used_mb": g.vram_used_mb,
|
||||
"vram_free_mb": g.vram_free_mb,
|
||||
}
|
||||
for g in gpus
|
||||
],
|
||||
}
|
||||
|
||||
@app.post("/evict")
|
||||
def evict(req: EvictRequest) -> dict[str, Any]:
|
||||
result = _executor.evict_pid(pid=req.pid, grace_period_s=req.grace_period_s)
|
||||
return {
|
||||
"success": result.success,
|
||||
"method": result.method,
|
||||
"message": result.message,
|
||||
}
|
||||
|
||||
return app
|
||||
85
circuitforge_core/resources/agent/eviction_executor.py
Normal file
85
circuitforge_core/resources/agent/eviction_executor.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
import psutil
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_GRACE_S = 5.0
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EvictionResult:
|
||||
success: bool
|
||||
method: str # "sigterm", "sigkill", "already_gone", "not_found", "error"
|
||||
message: str
|
||||
|
||||
|
||||
class EvictionExecutor:
|
||||
def __init__(self, grace_period_s: float = _DEFAULT_GRACE_S) -> None:
|
||||
self._default_grace = grace_period_s
|
||||
|
||||
def evict_pid(
|
||||
self,
|
||||
pid: int,
|
||||
grace_period_s: float | None = None,
|
||||
) -> EvictionResult:
|
||||
grace = grace_period_s if grace_period_s is not None else self._default_grace
|
||||
|
||||
if pid <= 0:
|
||||
return EvictionResult(
|
||||
success=False, method="error",
|
||||
message=f"Refusing to signal invalid PID {pid}"
|
||||
)
|
||||
|
||||
if not psutil.pid_exists(pid):
|
||||
return EvictionResult(
|
||||
success=False, method="not_found",
|
||||
message=f"PID {pid} not found"
|
||||
)
|
||||
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
return EvictionResult(
|
||||
success=True, method="already_gone",
|
||||
message=f"PID {pid} vanished before SIGTERM"
|
||||
)
|
||||
except PermissionError as exc:
|
||||
return EvictionResult(
|
||||
success=False, method="error",
|
||||
message=f"Permission denied terminating PID {pid}: {exc}"
|
||||
)
|
||||
|
||||
# Wait for grace period
|
||||
deadline = time.monotonic() + grace
|
||||
while time.monotonic() < deadline:
|
||||
if not psutil.pid_exists(pid):
|
||||
logger.info("PID %d exited cleanly after SIGTERM", pid)
|
||||
return EvictionResult(
|
||||
success=True, method="sigterm",
|
||||
message=f"PID {pid} exited after SIGTERM"
|
||||
)
|
||||
time.sleep(0.05)
|
||||
|
||||
# Escalate to SIGKILL
|
||||
if psutil.pid_exists(pid):
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
logger.warning("PID %d required SIGKILL", pid)
|
||||
return EvictionResult(
|
||||
success=True, method="sigkill",
|
||||
message=f"PID {pid} killed with SIGKILL"
|
||||
)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
|
||||
return EvictionResult(
|
||||
success=True, method="sigkill",
|
||||
message=f"PID {pid} is gone"
|
||||
)
|
||||
52
circuitforge_core/resources/agent/gpu_monitor.py
Normal file
52
circuitforge_core/resources/agent/gpu_monitor.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
from circuitforge_core.resources.models import GpuInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_NVIDIA_SMI_CMD = [
|
||||
"nvidia-smi",
|
||||
"--query-gpu=index,name,memory.total,memory.used,memory.free",
|
||||
"--format=csv,noheader,nounits",
|
||||
]
|
||||
|
||||
|
||||
class GpuMonitor:
|
||||
def poll(self) -> list[GpuInfo]:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
_NVIDIA_SMI_CMD,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired) as exc:
|
||||
logger.warning("nvidia-smi unavailable: %s", exc)
|
||||
return []
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.warning("nvidia-smi exited %d", result.returncode)
|
||||
return []
|
||||
|
||||
return self._parse(result.stdout)
|
||||
|
||||
def _parse(self, output: str) -> list[GpuInfo]:
|
||||
gpus: list[GpuInfo] = []
|
||||
for line in output.strip().splitlines():
|
||||
parts = [p.strip() for p in line.split(",")]
|
||||
if len(parts) != 5:
|
||||
continue
|
||||
try:
|
||||
gpus.append(GpuInfo(
|
||||
gpu_id=int(parts[0]),
|
||||
name=parts[1],
|
||||
vram_total_mb=int(parts[2]),
|
||||
vram_used_mb=int(parts[3]),
|
||||
vram_free_mb=int(parts[4]),
|
||||
))
|
||||
except ValueError:
|
||||
logger.debug("Skipping malformed nvidia-smi line: %r", line)
|
||||
return gpus
|
||||
143
circuitforge_core/resources/cli.py
Normal file
143
circuitforge_core/resources/cli.py
Normal file
|
|
@ -0,0 +1,143 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Annotated, Optional
|
||||
|
||||
import typer
|
||||
import uvicorn
|
||||
|
||||
app = typer.Typer(name="cf-orch", help="CircuitForge GPU resource orchestrator")
|
||||
|
||||
_SYSTEMD_UNIT_PATH = Path("/etc/systemd/system/cf-orch.service")
|
||||
|
||||
_SYSTEMD_UNIT_TEMPLATE = """\
|
||||
[Unit]
|
||||
Description=CircuitForge GPU Resource Orchestrator
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart={python} -m circuitforge_core.resources.cli start
|
||||
Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
"""
|
||||
|
||||
|
||||
@app.command()
|
||||
def start(
|
||||
profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None,
|
||||
host: str = "0.0.0.0",
|
||||
port: int = 7700,
|
||||
agent_port: int = 7701,
|
||||
) -> None:
|
||||
"""Start the cf-orch coordinator (auto-detects GPU profile if not specified)."""
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
|
||||
from circuitforge_core.resources.coordinator.app import create_coordinator_app
|
||||
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
|
||||
|
||||
lease_manager = LeaseManager()
|
||||
profile_registry = ProfileRegistry()
|
||||
supervisor = AgentSupervisor(lease_manager=lease_manager)
|
||||
|
||||
monitor = GpuMonitor()
|
||||
gpus = monitor.poll()
|
||||
if not gpus:
|
||||
typer.echo(
|
||||
"Warning: no GPUs detected via nvidia-smi — coordinator running with 0 VRAM"
|
||||
)
|
||||
else:
|
||||
for gpu in gpus:
|
||||
lease_manager.register_gpu("local", gpu.gpu_id, gpu.vram_total_mb)
|
||||
typer.echo(f"Detected {len(gpus)} GPU(s)")
|
||||
|
||||
if profile:
|
||||
active_profile = profile_registry.load(profile)
|
||||
typer.echo(f"Using profile: {active_profile.name} (from {profile})")
|
||||
else:
|
||||
active_profile = (
|
||||
profile_registry.auto_detect(gpus)
|
||||
if gpus
|
||||
else profile_registry.list_public()[-1]
|
||||
)
|
||||
typer.echo(f"Auto-selected profile: {active_profile.name}")
|
||||
|
||||
coordinator_app = create_coordinator_app(
|
||||
lease_manager=lease_manager,
|
||||
profile_registry=profile_registry,
|
||||
agent_supervisor=supervisor,
|
||||
)
|
||||
|
||||
typer.echo(f"Starting cf-orch coordinator on {host}:{port}")
|
||||
uvicorn.run(coordinator_app, host=host, port=port)
|
||||
|
||||
|
||||
@app.command()
|
||||
def agent(
|
||||
coordinator: str = "http://localhost:7700",
|
||||
node_id: str = "local",
|
||||
host: str = "0.0.0.0",
|
||||
port: int = 7701,
|
||||
) -> None:
|
||||
"""Start a cf-orch node agent (for remote nodes like Navi, Huginn)."""
|
||||
from circuitforge_core.resources.agent.app import create_agent_app
|
||||
|
||||
agent_app = create_agent_app(node_id=node_id)
|
||||
typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}")
|
||||
uvicorn.run(agent_app, host=host, port=port)
|
||||
|
||||
|
||||
@app.command()
|
||||
def status(coordinator: str = "http://localhost:7700") -> None:
|
||||
"""Show GPU and lease status from the coordinator."""
|
||||
import httpx
|
||||
|
||||
try:
|
||||
resp = httpx.get(f"{coordinator}/api/nodes", timeout=5.0)
|
||||
resp.raise_for_status()
|
||||
nodes = resp.json().get("nodes", [])
|
||||
for node in nodes:
|
||||
typer.echo(f"\nNode: {node['node_id']}")
|
||||
for gpu in node.get("gpus", []):
|
||||
typer.echo(
|
||||
f" GPU {gpu['gpu_id']}: {gpu['name']} — "
|
||||
f"{gpu['vram_used_mb']}/{gpu['vram_total_mb']} MB used"
|
||||
)
|
||||
except Exception as exc:
|
||||
typer.echo(f"Coordinator unreachable at {coordinator}: {exc}", err=True)
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
@app.command("install-service")
|
||||
def install_service(
|
||||
dry_run: bool = typer.Option(
|
||||
False, "--dry-run", help="Print unit file without writing"
|
||||
),
|
||||
) -> None:
|
||||
"""Write a systemd unit file for cf-orch (requires root)."""
|
||||
python = sys.executable
|
||||
unit_content = _SYSTEMD_UNIT_TEMPLATE.format(python=python)
|
||||
if dry_run:
|
||||
typer.echo(f"Would write to {_SYSTEMD_UNIT_PATH}:\n")
|
||||
typer.echo(unit_content)
|
||||
return
|
||||
try:
|
||||
_SYSTEMD_UNIT_PATH.write_text(unit_content)
|
||||
typer.echo(f"Written: {_SYSTEMD_UNIT_PATH}")
|
||||
typer.echo(
|
||||
"Run: sudo systemctl daemon-reload && sudo systemctl enable --now cf-orch"
|
||||
)
|
||||
except PermissionError:
|
||||
typer.echo(
|
||||
f"Permission denied writing to {_SYSTEMD_UNIT_PATH}. Run as root.", err=True
|
||||
)
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
44
circuitforge_core/resources/compose.yml
Normal file
44
circuitforge_core/resources/compose.yml
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
# circuitforge_core/resources/compose.yml
|
||||
# One-command cf-orch deployment for Docker self-hosters:
|
||||
# docker compose -f path/to/compose.yml up cf-orch-coordinator
|
||||
|
||||
services:
|
||||
cf-orch-coordinator:
|
||||
image: python:3.12-slim
|
||||
command: >
|
||||
sh -c "pip install 'circuitforge-core[orch]' &&
|
||||
cf-orch start --host 0.0.0.0 --port 7700"
|
||||
ports:
|
||||
- "7700:7700"
|
||||
volumes:
|
||||
- /run/docker.sock:/var/run/docker.sock:ro
|
||||
- cf-orch-data:/data
|
||||
environment:
|
||||
- CFORCH_PROFILE=${CFORCH_PROFILE:-}
|
||||
restart: unless-stopped
|
||||
devices:
|
||||
- /dev/nvidia0:/dev/nvidia0
|
||||
- /dev/nvidiactl:/dev/nvidiactl
|
||||
runtime: nvidia
|
||||
|
||||
cf-orch-agent:
|
||||
image: python:3.12-slim
|
||||
command: >
|
||||
sh -c "pip install 'circuitforge-core[orch]' &&
|
||||
cf-orch agent --coordinator http://cf-orch-coordinator:7700
|
||||
--node-id ${CFORCH_NODE_ID:-local}
|
||||
--host 0.0.0.0 --port 7701"
|
||||
ports:
|
||||
- "7701:7701"
|
||||
depends_on:
|
||||
- cf-orch-coordinator
|
||||
environment:
|
||||
- CFORCH_NODE_ID=${CFORCH_NODE_ID:-local}
|
||||
restart: unless-stopped
|
||||
devices:
|
||||
- /dev/nvidia0:/dev/nvidia0
|
||||
- /dev/nvidiactl:/dev/nvidiactl
|
||||
runtime: nvidia
|
||||
|
||||
volumes:
|
||||
cf-orch-data:
|
||||
0
circuitforge_core/resources/coordinator/__init__.py
Normal file
0
circuitforge_core/resources/coordinator/__init__.py
Normal file
101
circuitforge_core/resources/coordinator/agent_supervisor.py
Normal file
101
circuitforge_core/resources/coordinator/agent_supervisor.py
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import httpx
|
||||
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.models import GpuInfo, NodeInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_HEARTBEAT_INTERVAL_S = 10.0
|
||||
_AGENT_TIMEOUT_S = 5.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentRecord:
|
||||
node_id: str
|
||||
agent_url: str
|
||||
last_seen: float = field(default_factory=time.time)
|
||||
gpus: list[GpuInfo] = field(default_factory=list)
|
||||
online: bool = False
|
||||
|
||||
|
||||
class AgentSupervisor:
|
||||
def __init__(self, lease_manager: LeaseManager) -> None:
|
||||
self._agents: dict[str, AgentRecord] = {}
|
||||
self._lease_manager = lease_manager
|
||||
self._running = False
|
||||
|
||||
def register(self, node_id: str, agent_url: str) -> None:
|
||||
if node_id not in self._agents:
|
||||
self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url)
|
||||
logger.info("Registered agent node: %s @ %s", node_id, agent_url)
|
||||
|
||||
def get_node_info(self, node_id: str) -> NodeInfo | None:
|
||||
record = self._agents.get(node_id)
|
||||
if record is None:
|
||||
return None
|
||||
return NodeInfo(
|
||||
node_id=record.node_id,
|
||||
agent_url=record.agent_url,
|
||||
gpus=record.gpus,
|
||||
last_heartbeat=record.last_seen,
|
||||
)
|
||||
|
||||
def all_nodes(self) -> list[NodeInfo]:
|
||||
return [
|
||||
NodeInfo(
|
||||
node_id=r.node_id,
|
||||
agent_url=r.agent_url,
|
||||
gpus=r.gpus,
|
||||
last_heartbeat=r.last_seen,
|
||||
)
|
||||
for r in self._agents.values()
|
||||
]
|
||||
|
||||
async def poll_agent(self, node_id: str) -> bool:
|
||||
record = self._agents.get(node_id)
|
||||
if record is None:
|
||||
return False
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client:
|
||||
resp = await client.get(f"{record.agent_url}/gpu-info")
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
gpus = [
|
||||
GpuInfo(
|
||||
gpu_id=g["gpu_id"],
|
||||
name=g["name"],
|
||||
vram_total_mb=g["vram_total_mb"],
|
||||
vram_used_mb=g["vram_used_mb"],
|
||||
vram_free_mb=g["vram_free_mb"],
|
||||
)
|
||||
for g in data.get("gpus", [])
|
||||
]
|
||||
record.gpus = gpus
|
||||
record.last_seen = time.time()
|
||||
record.online = True
|
||||
for gpu in gpus:
|
||||
self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb)
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning("Agent %s unreachable: %s", node_id, exc)
|
||||
record.online = False
|
||||
return False
|
||||
|
||||
async def poll_all(self) -> None:
|
||||
await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents])
|
||||
|
||||
async def run_heartbeat_loop(self) -> None:
|
||||
self._running = True
|
||||
while self._running:
|
||||
await self.poll_all()
|
||||
await asyncio.sleep(_HEARTBEAT_INTERVAL_S)
|
||||
|
||||
def stop(self) -> None:
|
||||
self._running = False
|
||||
129
circuitforge_core/resources/coordinator/app.py
Normal file
129
circuitforge_core/resources/coordinator/app.py
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
|
||||
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
|
||||
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
|
||||
|
||||
class LeaseRequest(BaseModel):
|
||||
node_id: str
|
||||
gpu_id: int
|
||||
mb: int
|
||||
service: str
|
||||
priority: int = 2
|
||||
ttl_s: float = 0.0
|
||||
|
||||
|
||||
def create_coordinator_app(
|
||||
lease_manager: LeaseManager,
|
||||
profile_registry: ProfileRegistry,
|
||||
agent_supervisor: AgentSupervisor,
|
||||
) -> FastAPI:
|
||||
eviction_engine = EvictionEngine(lease_manager=lease_manager)
|
||||
|
||||
app = FastAPI(title="cf-orch-coordinator")
|
||||
|
||||
@app.get("/api/health")
|
||||
def health() -> dict[str, Any]:
|
||||
return {"status": "ok"}
|
||||
|
||||
@app.get("/api/nodes")
|
||||
def get_nodes() -> dict[str, Any]:
|
||||
nodes = agent_supervisor.all_nodes()
|
||||
return {
|
||||
"nodes": [
|
||||
{
|
||||
"node_id": n.node_id,
|
||||
"agent_url": n.agent_url,
|
||||
"last_heartbeat": n.last_heartbeat,
|
||||
"gpus": [
|
||||
{
|
||||
"gpu_id": g.gpu_id,
|
||||
"name": g.name,
|
||||
"vram_total_mb": g.vram_total_mb,
|
||||
"vram_used_mb": g.vram_used_mb,
|
||||
"vram_free_mb": g.vram_free_mb,
|
||||
}
|
||||
for g in n.gpus
|
||||
],
|
||||
}
|
||||
for n in nodes
|
||||
]
|
||||
}
|
||||
|
||||
@app.get("/api/profiles")
|
||||
def get_profiles() -> dict[str, Any]:
|
||||
return {
|
||||
"profiles": [
|
||||
{"name": p.name, "vram_total_mb": p.vram_total_mb}
|
||||
for p in profile_registry.list_public()
|
||||
]
|
||||
}
|
||||
|
||||
@app.get("/api/leases")
|
||||
def get_leases() -> dict[str, Any]:
|
||||
return {
|
||||
"leases": [
|
||||
{
|
||||
"lease_id": lease.lease_id,
|
||||
"node_id": lease.node_id,
|
||||
"gpu_id": lease.gpu_id,
|
||||
"mb_granted": lease.mb_granted,
|
||||
"holder_service": lease.holder_service,
|
||||
"priority": lease.priority,
|
||||
"expires_at": lease.expires_at,
|
||||
}
|
||||
for lease in lease_manager.all_leases()
|
||||
]
|
||||
}
|
||||
|
||||
@app.post("/api/leases")
|
||||
async def request_lease(req: LeaseRequest) -> dict[str, Any]:
|
||||
node_info = agent_supervisor.get_node_info(req.node_id)
|
||||
if node_info is None:
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail=f"Unknown node_id {req.node_id!r} — node not registered",
|
||||
)
|
||||
agent_url = node_info.agent_url
|
||||
|
||||
lease = await eviction_engine.request_lease(
|
||||
node_id=req.node_id,
|
||||
gpu_id=req.gpu_id,
|
||||
mb=req.mb,
|
||||
service=req.service,
|
||||
priority=req.priority,
|
||||
agent_url=agent_url,
|
||||
ttl_s=req.ttl_s,
|
||||
)
|
||||
if lease is None:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Insufficient VRAM — no eviction candidates available",
|
||||
)
|
||||
return {
|
||||
"lease": {
|
||||
"lease_id": lease.lease_id,
|
||||
"node_id": lease.node_id,
|
||||
"gpu_id": lease.gpu_id,
|
||||
"mb_granted": lease.mb_granted,
|
||||
"holder_service": lease.holder_service,
|
||||
"priority": lease.priority,
|
||||
"expires_at": lease.expires_at,
|
||||
}
|
||||
}
|
||||
|
||||
@app.delete("/api/leases/{lease_id}")
|
||||
async def release_lease(lease_id: str) -> dict[str, Any]:
|
||||
released = await lease_manager.release(lease_id)
|
||||
if not released:
|
||||
raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found")
|
||||
return {"released": True, "lease_id": lease_id}
|
||||
|
||||
return app
|
||||
81
circuitforge_core/resources/coordinator/eviction_engine.py
Normal file
81
circuitforge_core/resources/coordinator/eviction_engine.py
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.models import VRAMLease
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_EVICTION_TIMEOUT_S = 10.0
|
||||
|
||||
|
||||
class EvictionEngine:
|
||||
def __init__(
|
||||
self,
|
||||
lease_manager: LeaseManager,
|
||||
eviction_timeout_s: float = _DEFAULT_EVICTION_TIMEOUT_S,
|
||||
) -> None:
|
||||
self.lease_manager = lease_manager
|
||||
self._timeout = eviction_timeout_s
|
||||
|
||||
async def request_lease(
|
||||
self,
|
||||
node_id: str,
|
||||
gpu_id: int,
|
||||
mb: int,
|
||||
service: str,
|
||||
priority: int,
|
||||
agent_url: str,
|
||||
ttl_s: float = 0.0,
|
||||
) -> VRAMLease | None:
|
||||
# Fast path: enough free VRAM
|
||||
lease = await self.lease_manager.try_grant(
|
||||
node_id, gpu_id, mb, service, priority, ttl_s
|
||||
)
|
||||
if lease is not None:
|
||||
return lease
|
||||
|
||||
# Find eviction candidates
|
||||
candidates = self.lease_manager.get_eviction_candidates(
|
||||
node_id=node_id, gpu_id=gpu_id,
|
||||
needed_mb=mb, requester_priority=priority,
|
||||
)
|
||||
if not candidates:
|
||||
logger.info(
|
||||
"No eviction candidates for %s on %s:GPU%d (%dMB needed)",
|
||||
service, node_id, gpu_id, mb,
|
||||
)
|
||||
return None
|
||||
|
||||
# Evict candidates
|
||||
freed_mb = sum(c.mb_granted for c in candidates)
|
||||
logger.info(
|
||||
"Evicting %d lease(s) to free %dMB for %s",
|
||||
len(candidates), freed_mb, service,
|
||||
)
|
||||
for candidate in candidates:
|
||||
await self._evict_lease(candidate, agent_url)
|
||||
|
||||
# Wait for evictions to free up VRAM (poll with timeout)
|
||||
loop = asyncio.get_running_loop()
|
||||
deadline = loop.time() + self._timeout
|
||||
while loop.time() < deadline:
|
||||
lease = await self.lease_manager.try_grant(
|
||||
node_id, gpu_id, mb, service, priority, ttl_s
|
||||
)
|
||||
if lease is not None:
|
||||
return lease
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
logger.warning("Eviction timed out for %s after %.1fs", service, self._timeout)
|
||||
return None
|
||||
|
||||
async def _evict_lease(self, lease: VRAMLease, agent_url: str) -> None:
|
||||
"""Release lease accounting. Process-level eviction deferred to Plan B."""
|
||||
await self.lease_manager.release(lease.lease_id)
|
||||
|
||||
async def _call_agent_evict(self, agent_url: str, lease: VRAMLease) -> bool:
|
||||
"""POST /evict to the agent. Stub for v1 — real process lookup in Plan B."""
|
||||
return True
|
||||
88
circuitforge_core/resources/coordinator/lease_manager.py
Normal file
88
circuitforge_core/resources/coordinator/lease_manager.py
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
|
||||
from circuitforge_core.resources.models import VRAMLease
|
||||
|
||||
|
||||
class LeaseManager:
|
||||
def __init__(self) -> None:
|
||||
self._leases: dict[str, VRAMLease] = {}
|
||||
self._gpu_total: dict[tuple[str, int], int] = {}
|
||||
self._gpu_used: dict[tuple[str, int], int] = defaultdict(int)
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None:
|
||||
self._gpu_total[(node_id, gpu_id)] = total_mb
|
||||
|
||||
def gpu_total_mb(self, node_id: str, gpu_id: int) -> int:
|
||||
return self._gpu_total.get((node_id, gpu_id), 0)
|
||||
|
||||
def used_mb(self, node_id: str, gpu_id: int) -> int:
|
||||
return self._gpu_used[(node_id, gpu_id)]
|
||||
|
||||
async def try_grant(
|
||||
self,
|
||||
node_id: str,
|
||||
gpu_id: int,
|
||||
mb: int,
|
||||
service: str,
|
||||
priority: int,
|
||||
ttl_s: float = 0.0,
|
||||
) -> VRAMLease | None:
|
||||
async with self._lock:
|
||||
total = self._gpu_total.get((node_id, gpu_id), 0)
|
||||
used = self._gpu_used[(node_id, gpu_id)]
|
||||
if total - used < mb:
|
||||
return None
|
||||
lease = VRAMLease.create(
|
||||
gpu_id=gpu_id, node_id=node_id, mb=mb,
|
||||
service=service, priority=priority, ttl_s=ttl_s,
|
||||
)
|
||||
self._leases[lease.lease_id] = lease
|
||||
self._gpu_used[(node_id, gpu_id)] += mb
|
||||
return lease
|
||||
|
||||
async def release(self, lease_id: str) -> bool:
|
||||
async with self._lock:
|
||||
lease = self._leases.pop(lease_id, None)
|
||||
if lease is None:
|
||||
return False
|
||||
self._gpu_used[(lease.node_id, lease.gpu_id)] -= lease.mb_granted
|
||||
return True
|
||||
|
||||
def get_eviction_candidates(
|
||||
self,
|
||||
node_id: str,
|
||||
gpu_id: int,
|
||||
needed_mb: int,
|
||||
requester_priority: int,
|
||||
) -> list[VRAMLease]:
|
||||
candidates = [
|
||||
lease for lease in self._leases.values()
|
||||
if lease.node_id == node_id
|
||||
and lease.gpu_id == gpu_id
|
||||
and lease.priority > requester_priority
|
||||
]
|
||||
candidates.sort(key=lambda lease: lease.priority, reverse=True)
|
||||
selected: list[VRAMLease] = []
|
||||
freed = 0
|
||||
for candidate in candidates:
|
||||
selected.append(candidate)
|
||||
freed += candidate.mb_granted
|
||||
if freed >= needed_mb:
|
||||
break
|
||||
return selected
|
||||
|
||||
def list_leases(
|
||||
self, node_id: str | None = None, gpu_id: int | None = None
|
||||
) -> list[VRAMLease]:
|
||||
return [
|
||||
lease for lease in self._leases.values()
|
||||
if (node_id is None or lease.node_id == node_id)
|
||||
and (gpu_id is None or lease.gpu_id == gpu_id)
|
||||
]
|
||||
|
||||
def all_leases(self) -> list[VRAMLease]:
|
||||
return list(self._leases.values())
|
||||
65
circuitforge_core/resources/coordinator/profile_registry.py
Normal file
65
circuitforge_core/resources/coordinator/profile_registry.py
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
# circuitforge_core/resources/coordinator/profile_registry.py
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from circuitforge_core.resources.models import GpuInfo
|
||||
from circuitforge_core.resources.profiles.schema import GpuProfile, load_profile
|
||||
|
||||
_PUBLIC_DIR = Path(__file__).parent.parent / "profiles" / "public"
|
||||
|
||||
# VRAM thresholds for public profile selection (MB)
|
||||
_PROFILE_THRESHOLDS = [
|
||||
(22000, "single-gpu-24gb"),
|
||||
(14000, "single-gpu-16gb"),
|
||||
(8000, "single-gpu-8gb"),
|
||||
(5500, "single-gpu-6gb"),
|
||||
(3500, "single-gpu-4gb"),
|
||||
(0, "single-gpu-2gb"),
|
||||
]
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProfileRegistry:
|
||||
def __init__(self, extra_dirs: list[Path] | None = None) -> None:
|
||||
self._profiles: dict[str, GpuProfile] = {}
|
||||
self._load_dir(_PUBLIC_DIR)
|
||||
for d in (extra_dirs or []):
|
||||
if d.exists():
|
||||
self._load_dir(d)
|
||||
|
||||
def _load_dir(self, directory: Path) -> None:
|
||||
for yaml_file in directory.glob("*.yaml"):
|
||||
try:
|
||||
profile = load_profile(yaml_file)
|
||||
self._profiles[profile.name] = profile
|
||||
except Exception as exc:
|
||||
_log.warning("Skipping %s: %s", yaml_file, exc)
|
||||
|
||||
def load(self, path: Path) -> GpuProfile:
|
||||
profile = load_profile(path)
|
||||
self._profiles[profile.name] = profile
|
||||
return profile
|
||||
|
||||
def list_public(self) -> list[GpuProfile]:
|
||||
# CPU profiles (cpu-*) are intentionally excluded — this endpoint
|
||||
# is used to match GPU hardware. CPU inference nodes self-select
|
||||
# their profile via the CLI and are not listed for lease matching.
|
||||
return [
|
||||
p for p in self._profiles.values()
|
||||
if p.name.startswith("single-gpu-")
|
||||
]
|
||||
|
||||
def get(self, name: str) -> GpuProfile | None:
|
||||
return self._profiles.get(name)
|
||||
|
||||
def auto_detect(self, gpus: list[GpuInfo]) -> GpuProfile:
|
||||
primary_vram = gpus[0].vram_total_mb if gpus else 0
|
||||
for threshold_mb, profile_name in _PROFILE_THRESHOLDS:
|
||||
if primary_vram >= threshold_mb:
|
||||
profile = self._profiles.get(profile_name)
|
||||
if profile:
|
||||
return profile
|
||||
return self._profiles["single-gpu-2gb"]
|
||||
56
circuitforge_core/resources/models.py
Normal file
56
circuitforge_core/resources/models.py
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VRAMLease:
|
||||
lease_id: str
|
||||
gpu_id: int
|
||||
node_id: str
|
||||
mb_granted: int
|
||||
holder_service: str
|
||||
priority: int
|
||||
expires_at: float # unix timestamp; 0.0 = no expiry
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls,
|
||||
gpu_id: int,
|
||||
node_id: str,
|
||||
mb: int,
|
||||
service: str,
|
||||
priority: int,
|
||||
ttl_s: float = 0.0,
|
||||
) -> VRAMLease:
|
||||
return cls(
|
||||
lease_id=str(uuid.uuid4()),
|
||||
gpu_id=gpu_id,
|
||||
node_id=node_id,
|
||||
mb_granted=mb,
|
||||
holder_service=service,
|
||||
priority=priority,
|
||||
expires_at=time.time() + ttl_s if ttl_s > 0.0 else 0.0,
|
||||
)
|
||||
|
||||
def is_expired(self) -> bool:
|
||||
return self.expires_at > 0.0 and time.time() > self.expires_at
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GpuInfo:
|
||||
gpu_id: int
|
||||
name: str
|
||||
vram_total_mb: int
|
||||
vram_used_mb: int
|
||||
vram_free_mb: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeInfo:
|
||||
node_id: str
|
||||
agent_url: str
|
||||
gpus: list[GpuInfo]
|
||||
last_heartbeat: float = field(default_factory=time.time)
|
||||
0
circuitforge_core/resources/profiles/__init__.py
Normal file
0
circuitforge_core/resources/profiles/__init__.py
Normal file
33
circuitforge_core/resources/profiles/public/cpu-16gb.yaml
Normal file
33
circuitforge_core/resources/profiles/public/cpu-16gb.yaml
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
schema_version: 1
|
||||
name: cpu-16gb
|
||||
eviction_timeout_s: 30.0
|
||||
services:
|
||||
ollama:
|
||||
max_mb: 0
|
||||
priority: 1
|
||||
cf-stt:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 1
|
||||
backend: moonshine
|
||||
cf-tts:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 1
|
||||
cf-embed:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 2
|
||||
always_on: true
|
||||
cf-classify:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 2
|
||||
always_on: true
|
||||
model_size_hints:
|
||||
llm_max_params: 3b-q4
|
||||
image_gen_max: none
|
||||
33
circuitforge_core/resources/profiles/public/cpu-32gb.yaml
Normal file
33
circuitforge_core/resources/profiles/public/cpu-32gb.yaml
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
schema_version: 1
|
||||
name: cpu-32gb
|
||||
eviction_timeout_s: 30.0
|
||||
services:
|
||||
ollama:
|
||||
max_mb: 0
|
||||
priority: 1
|
||||
cf-stt:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 2
|
||||
backend: faster-whisper
|
||||
cf-tts:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 2
|
||||
cf-embed:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 4
|
||||
always_on: true
|
||||
cf-classify:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 4
|
||||
always_on: true
|
||||
model_size_hints:
|
||||
llm_max_params: 7b-q4
|
||||
image_gen_max: none
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
schema_version: 1
|
||||
name: single-gpu-16gb
|
||||
vram_total_mb: 16384
|
||||
eviction_timeout_s: 10.0
|
||||
services:
|
||||
vllm:
|
||||
max_mb: 12288
|
||||
priority: 1
|
||||
ollama:
|
||||
max_mb: 12288
|
||||
priority: 1
|
||||
cf-vision:
|
||||
max_mb: 3072
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 4
|
||||
cf-stt:
|
||||
max_mb: 1200
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 3
|
||||
backend: parakeet-tdt
|
||||
cf-tts:
|
||||
max_mb: 1024
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 3
|
||||
cf-embed:
|
||||
max_mb: 512
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 6
|
||||
always_on: true
|
||||
cf-classify:
|
||||
max_mb: 512
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 6
|
||||
always_on: true
|
||||
comfyui:
|
||||
max_mb: 14336
|
||||
priority: 4
|
||||
model_size_hints:
|
||||
llm_max_params: 34b
|
||||
image_gen_max: flux-dev-fp8
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
schema_version: 1
|
||||
name: single-gpu-24gb
|
||||
vram_total_mb: 24576
|
||||
eviction_timeout_s: 10.0
|
||||
services:
|
||||
vllm:
|
||||
max_mb: 20480
|
||||
priority: 1
|
||||
ollama:
|
||||
max_mb: 18432
|
||||
priority: 1
|
||||
cf-vision:
|
||||
max_mb: 4096
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 6
|
||||
cf-stt:
|
||||
max_mb: 1200
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 4
|
||||
backend: parakeet-tdt
|
||||
cf-tts:
|
||||
max_mb: 1024
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 4
|
||||
cf-embed:
|
||||
max_mb: 512
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 8
|
||||
always_on: true
|
||||
cf-classify:
|
||||
max_mb: 512
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 8
|
||||
always_on: true
|
||||
comfyui:
|
||||
max_mb: 20480
|
||||
priority: 4
|
||||
model_size_hints:
|
||||
llm_max_params: 70b
|
||||
image_gen_max: flux-dev-fp16
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
schema_version: 1
|
||||
name: single-gpu-2gb
|
||||
vram_total_mb: 2048
|
||||
eviction_timeout_s: 15.0
|
||||
services:
|
||||
ollama:
|
||||
max_mb: 1536
|
||||
priority: 1
|
||||
cf-vision:
|
||||
max_mb: 512
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 1
|
||||
cf-stt:
|
||||
max_mb: 200
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 1
|
||||
backend: moonshine
|
||||
model_size_hints:
|
||||
llm_max_params: 3b
|
||||
image_gen_max: none
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
schema_version: 1
|
||||
name: single-gpu-4gb
|
||||
vram_total_mb: 4096
|
||||
eviction_timeout_s: 15.0
|
||||
services:
|
||||
ollama:
|
||||
max_mb: 3072
|
||||
priority: 1
|
||||
cf-vision:
|
||||
max_mb: 1024
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 1
|
||||
cf-stt:
|
||||
max_mb: 600
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 1
|
||||
backend: faster-whisper
|
||||
cf-tts:
|
||||
max_mb: 512
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 1
|
||||
comfyui:
|
||||
max_mb: 3584
|
||||
priority: 4
|
||||
model_size_hints:
|
||||
llm_max_params: 3b
|
||||
image_gen_max: sd15-fp8
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
schema_version: 1
|
||||
name: single-gpu-6gb
|
||||
vram_total_mb: 6144
|
||||
eviction_timeout_s: 10.0
|
||||
services:
|
||||
vllm:
|
||||
max_mb: 4096
|
||||
priority: 1
|
||||
ollama:
|
||||
max_mb: 3584
|
||||
priority: 1
|
||||
cf-vision:
|
||||
max_mb: 1536
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 2
|
||||
cf-stt:
|
||||
max_mb: 600
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 2
|
||||
backend: faster-whisper
|
||||
cf-tts:
|
||||
max_mb: 768
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 1
|
||||
comfyui:
|
||||
max_mb: 5120
|
||||
priority: 4
|
||||
model_size_hints:
|
||||
llm_max_params: 7b
|
||||
image_gen_max: sd15
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
schema_version: 1
|
||||
name: single-gpu-8gb
|
||||
vram_total_mb: 8192
|
||||
eviction_timeout_s: 10.0
|
||||
services:
|
||||
vllm:
|
||||
max_mb: 5120
|
||||
priority: 1
|
||||
ollama:
|
||||
max_mb: 4096
|
||||
priority: 1
|
||||
cf-vision:
|
||||
max_mb: 2048
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 3
|
||||
cf-stt:
|
||||
max_mb: 1200
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 2
|
||||
backend: parakeet-tdt
|
||||
cf-tts:
|
||||
max_mb: 1024
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 2
|
||||
comfyui:
|
||||
max_mb: 6144
|
||||
priority: 4
|
||||
model_size_hints:
|
||||
llm_max_params: 8b
|
||||
image_gen_max: sdxl-fp8
|
||||
66
circuitforge_core/resources/profiles/schema.py
Normal file
66
circuitforge_core/resources/profiles/schema.py
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
# circuitforge_core/resources/profiles/schema.py
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
SUPPORTED_SCHEMA_VERSION = 1
|
||||
|
||||
|
||||
class ServiceProfile(BaseModel):
|
||||
max_mb: int
|
||||
priority: int
|
||||
shared: bool = False
|
||||
max_concurrent: int = 1
|
||||
always_on: bool = False
|
||||
backend: str | None = None
|
||||
consumers: list[str] = Field(default_factory=list)
|
||||
|
||||
model_config = {"frozen": True}
|
||||
|
||||
|
||||
class GpuNodeEntry(BaseModel):
|
||||
id: int
|
||||
vram_mb: int
|
||||
role: str
|
||||
card: str = "unknown"
|
||||
always_on: bool = False
|
||||
services: list[str] = Field(default_factory=list)
|
||||
|
||||
model_config = {"frozen": True}
|
||||
|
||||
|
||||
class NodeProfile(BaseModel):
|
||||
gpus: list[GpuNodeEntry]
|
||||
agent_url: str | None = None
|
||||
nas_mount: str | None = None
|
||||
|
||||
model_config = {"frozen": True}
|
||||
|
||||
|
||||
class GpuProfile(BaseModel):
|
||||
schema_version: int
|
||||
name: str
|
||||
vram_total_mb: int | None = None
|
||||
eviction_timeout_s: float = 10.0
|
||||
services: dict[str, ServiceProfile] = Field(default_factory=dict)
|
||||
model_size_hints: dict[str, str] = Field(default_factory=dict)
|
||||
nodes: dict[str, NodeProfile] = Field(default_factory=dict)
|
||||
|
||||
model_config = {"frozen": True}
|
||||
|
||||
|
||||
def load_profile(path: Path) -> GpuProfile:
|
||||
raw: dict[str, Any] = yaml.safe_load(path.read_text())
|
||||
if not isinstance(raw, dict):
|
||||
raise ValueError(f"Profile file {path} must be a YAML mapping, got {type(raw).__name__}")
|
||||
version = raw.get("schema_version")
|
||||
if version != SUPPORTED_SCHEMA_VERSION:
|
||||
raise ValueError(
|
||||
f"Unsupported schema_version {version!r} in {path}. "
|
||||
f"Expected {SUPPORTED_SCHEMA_VERSION}."
|
||||
)
|
||||
return GpuProfile.model_validate(raw)
|
||||
|
|
@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
|||
|
||||
[project]
|
||||
name = "circuitforge-core"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
description = "Shared scaffold for CircuitForge products"
|
||||
requires-python = ">=3.11"
|
||||
dependencies = [
|
||||
|
|
@ -13,9 +13,29 @@ dependencies = [
|
|||
"openai>=1.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
orch = [
|
||||
"fastapi>=0.110",
|
||||
"uvicorn[standard]>=0.29",
|
||||
"httpx>=0.27",
|
||||
"pydantic>=2.0",
|
||||
"typer[all]>=0.12",
|
||||
"psutil>=5.9",
|
||||
]
|
||||
dev = [
|
||||
"circuitforge-core[orch]",
|
||||
"pytest>=8.0",
|
||||
"pytest-asyncio>=0.23",
|
||||
"httpx>=0.27",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
cf-orch = "circuitforge_core.resources.cli:app"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["."]
|
||||
include = ["circuitforge_core*"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
asyncio_mode = "auto"
|
||||
|
|
|
|||
0
tests/test_resources/__init__.py
Normal file
0
tests/test_resources/__init__.py
Normal file
68
tests/test_resources/test_agent_app.py
Normal file
68
tests/test_resources/test_agent_app.py
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from circuitforge_core.resources.agent.app import create_agent_app
|
||||
from circuitforge_core.resources.models import GpuInfo
|
||||
from circuitforge_core.resources.agent.eviction_executor import EvictionResult
|
||||
|
||||
MOCK_GPUS = [
|
||||
GpuInfo(
|
||||
gpu_id=0,
|
||||
name="RTX 4000",
|
||||
vram_total_mb=8192,
|
||||
vram_used_mb=1024,
|
||||
vram_free_mb=7168,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def agent_client():
|
||||
mock_monitor = MagicMock()
|
||||
mock_monitor.poll.return_value = MOCK_GPUS
|
||||
mock_executor = MagicMock()
|
||||
app = create_agent_app(
|
||||
node_id="heimdall",
|
||||
monitor=mock_monitor,
|
||||
executor=mock_executor,
|
||||
)
|
||||
return TestClient(app), mock_monitor, mock_executor
|
||||
|
||||
|
||||
def test_health_returns_ok(agent_client):
|
||||
client, _, _ = agent_client
|
||||
resp = client.get("/health")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ok"
|
||||
assert resp.json()["node_id"] == "heimdall"
|
||||
|
||||
|
||||
def test_gpu_info_returns_gpu_list(agent_client):
|
||||
client, _, _ = agent_client
|
||||
resp = client.get("/gpu-info")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert len(data["gpus"]) == 1
|
||||
assert data["gpus"][0]["gpu_id"] == 0
|
||||
assert data["gpus"][0]["name"] == "RTX 4000"
|
||||
assert data["gpus"][0]["vram_free_mb"] == 7168
|
||||
|
||||
|
||||
def test_evict_calls_executor(agent_client):
|
||||
client, _, mock_executor = agent_client
|
||||
mock_executor.evict_pid.return_value = EvictionResult(
|
||||
success=True, method="sigterm", message="done"
|
||||
)
|
||||
resp = client.post("/evict", json={"pid": 1234, "grace_period_s": 5.0})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["success"] is True
|
||||
mock_executor.evict_pid.assert_called_once_with(pid=1234, grace_period_s=5.0)
|
||||
|
||||
|
||||
def test_evict_requires_pid(agent_client):
|
||||
client, _, _ = agent_client
|
||||
resp = client.post("/evict", json={"grace_period_s": 5.0})
|
||||
assert resp.status_code == 422
|
||||
33
tests/test_resources/test_cli.py
Normal file
33
tests/test_resources/test_cli.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from circuitforge_core.resources.cli import app
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
def test_cli_help():
|
||||
result = runner.invoke(app, ["--help"])
|
||||
assert result.exit_code == 0
|
||||
assert "cf-orch" in result.output.lower() or "Usage" in result.output
|
||||
|
||||
|
||||
def test_status_command_shows_no_coordinator_message():
|
||||
with patch("httpx.get", side_effect=ConnectionRefusedError("refused")):
|
||||
result = runner.invoke(app, ["status"])
|
||||
assert result.exit_code != 0 or "unreachable" in result.output.lower() \
|
||||
or "coordinator" in result.output.lower()
|
||||
|
||||
|
||||
def test_install_service_creates_systemd_unit(tmp_path: Path):
|
||||
unit_path = tmp_path / "cf-orch.service"
|
||||
with patch(
|
||||
"circuitforge_core.resources.cli._SYSTEMD_UNIT_PATH", unit_path
|
||||
):
|
||||
result = runner.invoke(app, ["install-service", "--dry-run"])
|
||||
assert result.exit_code == 0
|
||||
assert "cf-orch.service" in result.output or "systemd" in result.output.lower()
|
||||
102
tests/test_resources/test_coordinator_app.py
Normal file
102
tests/test_resources/test_coordinator_app.py
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from fastapi.testclient import TestClient
|
||||
from circuitforge_core.resources.coordinator.app import create_coordinator_app
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
from circuitforge_core.resources.models import GpuInfo, NodeInfo
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def coordinator_client():
|
||||
lease_manager = LeaseManager()
|
||||
lease_manager.register_gpu("heimdall", 0, 8192)
|
||||
profile_registry = ProfileRegistry()
|
||||
supervisor = MagicMock()
|
||||
supervisor.all_nodes.return_value = [
|
||||
NodeInfo(
|
||||
node_id="heimdall",
|
||||
agent_url="http://localhost:7701",
|
||||
gpus=[GpuInfo(gpu_id=0, name="RTX 4000",
|
||||
vram_total_mb=8192, vram_used_mb=0, vram_free_mb=8192)],
|
||||
last_heartbeat=0.0,
|
||||
)
|
||||
]
|
||||
supervisor.get_node_info.return_value = NodeInfo(
|
||||
node_id="heimdall",
|
||||
agent_url="http://localhost:7701",
|
||||
gpus=[],
|
||||
last_heartbeat=0.0,
|
||||
)
|
||||
app = create_coordinator_app(
|
||||
lease_manager=lease_manager,
|
||||
profile_registry=profile_registry,
|
||||
agent_supervisor=supervisor,
|
||||
)
|
||||
return TestClient(app), lease_manager
|
||||
|
||||
|
||||
def test_health_returns_ok(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.get("/api/health")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ok"
|
||||
|
||||
|
||||
def test_get_nodes_returns_list(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.get("/api/nodes")
|
||||
assert resp.status_code == 200
|
||||
nodes = resp.json()["nodes"]
|
||||
assert len(nodes) == 1
|
||||
assert nodes[0]["node_id"] == "heimdall"
|
||||
|
||||
|
||||
def test_get_profiles_returns_public_profiles(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.get("/api/profiles")
|
||||
assert resp.status_code == 200
|
||||
names = [p["name"] for p in resp.json()["profiles"]]
|
||||
assert "single-gpu-8gb" in names
|
||||
|
||||
|
||||
def test_post_lease_grants_lease(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.post("/api/leases", json={
|
||||
"node_id": "heimdall", "gpu_id": 0,
|
||||
"mb": 2048, "service": "peregrine", "priority": 1,
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["lease"]["mb_granted"] == 2048
|
||||
assert data["lease"]["holder_service"] == "peregrine"
|
||||
assert "lease_id" in data["lease"]
|
||||
|
||||
|
||||
def test_delete_lease_releases_it(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.post("/api/leases", json={
|
||||
"node_id": "heimdall", "gpu_id": 0,
|
||||
"mb": 2048, "service": "peregrine", "priority": 1,
|
||||
})
|
||||
lease_id = resp.json()["lease"]["lease_id"]
|
||||
del_resp = client.delete(f"/api/leases/{lease_id}")
|
||||
assert del_resp.status_code == 200
|
||||
assert del_resp.json()["released"] is True
|
||||
|
||||
|
||||
def test_delete_unknown_lease_returns_404(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
resp = client.delete("/api/leases/nonexistent-id")
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
def test_get_leases_returns_active_leases(coordinator_client):
|
||||
client, _ = coordinator_client
|
||||
client.post("/api/leases", json={
|
||||
"node_id": "heimdall", "gpu_id": 0,
|
||||
"mb": 1024, "service": "kiwi", "priority": 2,
|
||||
})
|
||||
resp = client.get("/api/leases")
|
||||
assert resp.status_code == 200
|
||||
assert len(resp.json()["leases"]) == 1
|
||||
67
tests/test_resources/test_eviction_engine.py
Normal file
67
tests/test_resources/test_eviction_engine.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
import asyncio
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lease_manager():
|
||||
mgr = LeaseManager()
|
||||
mgr.register_gpu("heimdall", 0, 8192)
|
||||
return mgr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def engine(lease_manager):
|
||||
return EvictionEngine(lease_manager=lease_manager, eviction_timeout_s=0.1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_lease_grants_when_vram_available(engine, lease_manager):
|
||||
lease = await engine.request_lease(
|
||||
node_id="heimdall", gpu_id=0, mb=4096,
|
||||
service="peregrine", priority=1,
|
||||
agent_url="http://localhost:7701",
|
||||
)
|
||||
assert lease is not None
|
||||
assert lease.mb_granted == 4096
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_lease_evicts_and_grants(engine, lease_manager):
|
||||
# Pre-fill with a low-priority lease
|
||||
big_lease = await lease_manager.try_grant(
|
||||
"heimdall", 0, 7000, "comfyui", priority=4
|
||||
)
|
||||
assert big_lease is not None
|
||||
|
||||
# Mock the agent eviction call
|
||||
with patch(
|
||||
"circuitforge_core.resources.coordinator.eviction_engine.EvictionEngine._call_agent_evict",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_evict:
|
||||
mock_evict.return_value = True
|
||||
# Simulate the comfyui lease being released (as if the agent evicted it)
|
||||
asyncio.get_event_loop().call_later(
|
||||
0.05, lambda: asyncio.ensure_future(lease_manager.release(big_lease.lease_id))
|
||||
)
|
||||
lease = await engine.request_lease(
|
||||
node_id="heimdall", gpu_id=0, mb=4096,
|
||||
service="peregrine", priority=1,
|
||||
agent_url="http://localhost:7701",
|
||||
)
|
||||
assert lease is not None
|
||||
assert lease.holder_service == "peregrine"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_lease_returns_none_when_no_eviction_candidates(engine):
|
||||
await engine.lease_manager.try_grant("heimdall", 0, 6000, "vllm", priority=1)
|
||||
# Requesting 4GB but no lower-priority leases exist
|
||||
lease = await engine.request_lease(
|
||||
node_id="heimdall", gpu_id=0, mb=4096,
|
||||
service="kiwi", priority=2,
|
||||
agent_url="http://localhost:7701",
|
||||
)
|
||||
assert lease is None
|
||||
43
tests/test_resources/test_eviction_executor.py
Normal file
43
tests/test_resources/test_eviction_executor.py
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
import signal
|
||||
from unittest.mock import patch, call
|
||||
import pytest
|
||||
from circuitforge_core.resources.agent.eviction_executor import EvictionExecutor, EvictionResult
|
||||
|
||||
|
||||
def test_evict_by_pid_sends_sigterm_then_sigkill():
|
||||
executor = EvictionExecutor(grace_period_s=0.01)
|
||||
# pid_exists always True → grace period expires → SIGKILL fires
|
||||
with patch("os.kill") as mock_kill, \
|
||||
patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil:
|
||||
mock_psutil.pid_exists.return_value = True
|
||||
result = executor.evict_pid(pid=1234, grace_period_s=0.01)
|
||||
|
||||
assert result.success is True
|
||||
calls = mock_kill.call_args_list
|
||||
assert call(1234, signal.SIGTERM) in calls
|
||||
assert call(1234, signal.SIGKILL) in calls
|
||||
|
||||
|
||||
def test_evict_pid_succeeds_on_sigterm_alone():
|
||||
executor = EvictionExecutor(grace_period_s=0.1)
|
||||
with patch("os.kill"), \
|
||||
patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil:
|
||||
mock_psutil.pid_exists.side_effect = [True, False] # gone after SIGTERM
|
||||
result = executor.evict_pid(pid=5678, grace_period_s=0.01)
|
||||
assert result.success is True
|
||||
assert result.method == "sigterm"
|
||||
|
||||
|
||||
def test_evict_pid_not_found_returns_failure():
|
||||
executor = EvictionExecutor()
|
||||
with patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil:
|
||||
mock_psutil.pid_exists.return_value = False
|
||||
result = executor.evict_pid(pid=9999)
|
||||
assert result.success is False
|
||||
assert "not found" in result.message.lower()
|
||||
|
||||
|
||||
def test_eviction_result_is_immutable():
|
||||
result = EvictionResult(success=True, method="sigterm", message="ok")
|
||||
with pytest.raises((AttributeError, TypeError)):
|
||||
result.success = False # type: ignore
|
||||
60
tests/test_resources/test_gpu_monitor.py
Normal file
60
tests/test_resources/test_gpu_monitor.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
from unittest.mock import patch
|
||||
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
|
||||
|
||||
|
||||
SAMPLE_NVIDIA_SMI_OUTPUT = (
|
||||
"0, Quadro RTX 4000, 8192, 6843, 1349\n"
|
||||
"1, Quadro RTX 4000, 8192, 721, 7471\n"
|
||||
)
|
||||
|
||||
|
||||
def test_parse_returns_list_of_gpu_info():
|
||||
monitor = GpuMonitor()
|
||||
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run:
|
||||
mock_run.return_value.returncode = 0
|
||||
mock_run.return_value.stdout = SAMPLE_NVIDIA_SMI_OUTPUT
|
||||
gpus = monitor.poll()
|
||||
assert len(gpus) == 2
|
||||
assert gpus[0].gpu_id == 0
|
||||
assert gpus[0].name == "Quadro RTX 4000"
|
||||
assert gpus[0].vram_total_mb == 8192
|
||||
assert gpus[0].vram_used_mb == 6843
|
||||
assert gpus[0].vram_free_mb == 1349
|
||||
|
||||
|
||||
def test_parse_second_gpu():
|
||||
monitor = GpuMonitor()
|
||||
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run:
|
||||
mock_run.return_value.returncode = 0
|
||||
mock_run.return_value.stdout = SAMPLE_NVIDIA_SMI_OUTPUT
|
||||
gpus = monitor.poll()
|
||||
assert gpus[1].gpu_id == 1
|
||||
assert gpus[1].vram_used_mb == 721
|
||||
assert gpus[1].vram_free_mb == 7471
|
||||
|
||||
|
||||
def test_poll_returns_empty_list_when_nvidia_smi_unavailable():
|
||||
monitor = GpuMonitor()
|
||||
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run", side_effect=FileNotFoundError):
|
||||
gpus = monitor.poll()
|
||||
assert gpus == []
|
||||
|
||||
|
||||
def test_poll_returns_empty_list_on_nonzero_exit():
|
||||
monitor = GpuMonitor()
|
||||
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run:
|
||||
mock_run.return_value.returncode = 1
|
||||
mock_run.return_value.stdout = ""
|
||||
gpus = monitor.poll()
|
||||
assert gpus == []
|
||||
|
||||
|
||||
def test_poll_skips_malformed_lines():
|
||||
monitor = GpuMonitor()
|
||||
malformed = "0, RTX 4000, 8192, not_a_number, 1024\n1, RTX 4000, 8192, 512, 7680\n"
|
||||
with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run:
|
||||
mock_run.return_value.returncode = 0
|
||||
mock_run.return_value.stdout = malformed
|
||||
gpus = monitor.poll()
|
||||
assert len(gpus) == 1
|
||||
assert gpus[0].gpu_id == 1
|
||||
219
tests/test_resources/test_integration.py
Normal file
219
tests/test_resources/test_integration.py
Normal file
|
|
@ -0,0 +1,219 @@
|
|||
"""Integration test: full lease → eviction → re-grant cycle.
|
||||
|
||||
Runs coordinator in-process (no subprocesses, no real nvidia-smi).
|
||||
Uses TestClient for HTTP, mocks AgentSupervisor to return fixed node state.
|
||||
"""
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
|
||||
from circuitforge_core.resources.coordinator.app import create_coordinator_app
|
||||
from circuitforge_core.resources.models import GpuInfo, NodeInfo
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def system():
|
||||
"""Create an in-process coordinator system with 8GB GPU and mock supervisor."""
|
||||
lease_manager = LeaseManager()
|
||||
lease_manager.register_gpu("local", 0, 8192)
|
||||
|
||||
mock_supervisor = MagicMock(spec=AgentSupervisor)
|
||||
mock_supervisor.all_nodes.return_value = [
|
||||
NodeInfo(
|
||||
node_id="local",
|
||||
agent_url="http://localhost:7701",
|
||||
gpus=[GpuInfo(
|
||||
gpu_id=0,
|
||||
name="RTX 4000",
|
||||
vram_total_mb=8192,
|
||||
vram_used_mb=0,
|
||||
vram_free_mb=8192,
|
||||
)],
|
||||
last_heartbeat=0.0,
|
||||
)
|
||||
]
|
||||
mock_supervisor.get_node_info.return_value = NodeInfo(
|
||||
node_id="local",
|
||||
agent_url="http://localhost:7701",
|
||||
gpus=[],
|
||||
last_heartbeat=0.0,
|
||||
)
|
||||
|
||||
profile_registry = ProfileRegistry()
|
||||
app = create_coordinator_app(
|
||||
lease_manager=lease_manager,
|
||||
profile_registry=profile_registry,
|
||||
agent_supervisor=mock_supervisor,
|
||||
)
|
||||
client = TestClient(app)
|
||||
return client, lease_manager
|
||||
|
||||
|
||||
def test_full_lease_cycle(system):
|
||||
"""Test: grant, verify, release, verify gone."""
|
||||
client, _ = system
|
||||
|
||||
# Grant a lease
|
||||
resp = client.post("/api/leases", json={
|
||||
"node_id": "local",
|
||||
"gpu_id": 0,
|
||||
"mb": 4096,
|
||||
"service": "peregrine",
|
||||
"priority": 1,
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
lease_data = resp.json()["lease"]
|
||||
lease_id = lease_data["lease_id"]
|
||||
assert lease_data["mb_granted"] == 4096
|
||||
assert lease_data["holder_service"] == "peregrine"
|
||||
|
||||
# Verify it appears in active leases
|
||||
resp = client.get("/api/leases")
|
||||
assert resp.status_code == 200
|
||||
leases = resp.json()["leases"]
|
||||
assert any(l["lease_id"] == lease_id for l in leases)
|
||||
|
||||
# Release it
|
||||
resp = client.delete(f"/api/leases/{lease_id}")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["released"] is True
|
||||
|
||||
# Verify it's gone
|
||||
resp = client.get("/api/leases")
|
||||
assert resp.status_code == 200
|
||||
leases = resp.json()["leases"]
|
||||
assert not any(l["lease_id"] == lease_id for l in leases)
|
||||
|
||||
|
||||
def test_vram_exhaustion_returns_503(system):
|
||||
"""Test: fill GPU, then request with no eviction candidates returns 503."""
|
||||
client, _ = system
|
||||
|
||||
# Fill GPU 0 with high-priority lease
|
||||
resp = client.post("/api/leases", json={
|
||||
"node_id": "local",
|
||||
"gpu_id": 0,
|
||||
"mb": 8000,
|
||||
"service": "vllm",
|
||||
"priority": 1,
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Try to get more VRAM with same priority (no eviction candidates)
|
||||
resp = client.post("/api/leases", json={
|
||||
"node_id": "local",
|
||||
"gpu_id": 0,
|
||||
"mb": 2000,
|
||||
"service": "kiwi",
|
||||
"priority": 1,
|
||||
})
|
||||
assert resp.status_code == 503
|
||||
assert "Insufficient VRAM" in resp.json()["detail"]
|
||||
|
||||
|
||||
def test_auto_detect_profile_for_8gb():
|
||||
"""Test: ProfileRegistry auto-detects single-gpu-8gb for 8GB GPU."""
|
||||
registry = ProfileRegistry()
|
||||
gpu = GpuInfo(
|
||||
gpu_id=0,
|
||||
name="RTX 4000",
|
||||
vram_total_mb=8192,
|
||||
vram_used_mb=0,
|
||||
vram_free_mb=8192,
|
||||
)
|
||||
profile = registry.auto_detect([gpu])
|
||||
assert profile.name == "single-gpu-8gb"
|
||||
# Verify profile has services configured
|
||||
assert hasattr(profile, "services")
|
||||
|
||||
|
||||
def test_node_endpoint_shows_nodes(system):
|
||||
"""Test: GET /api/nodes returns the mocked node."""
|
||||
client, _ = system
|
||||
resp = client.get("/api/nodes")
|
||||
assert resp.status_code == 200
|
||||
nodes = resp.json()["nodes"]
|
||||
assert len(nodes) == 1
|
||||
assert nodes[0]["node_id"] == "local"
|
||||
assert nodes[0]["agent_url"] == "http://localhost:7701"
|
||||
assert len(nodes[0]["gpus"]) == 1
|
||||
assert nodes[0]["gpus"][0]["name"] == "RTX 4000"
|
||||
|
||||
|
||||
def test_profiles_endpoint_returns_public_profiles(system):
|
||||
"""Test: GET /api/profiles returns standard public profiles."""
|
||||
client, _ = system
|
||||
resp = client.get("/api/profiles")
|
||||
assert resp.status_code == 200
|
||||
profiles = resp.json()["profiles"]
|
||||
names = [p["name"] for p in profiles]
|
||||
# Verify common public profiles are present
|
||||
assert "single-gpu-8gb" in names
|
||||
assert "single-gpu-6gb" in names
|
||||
assert "single-gpu-2gb" in names
|
||||
|
||||
|
||||
def test_multiple_leases_tracked_independently(system):
|
||||
"""Test: multiple active leases are tracked correctly."""
|
||||
client, _ = system
|
||||
|
||||
# Grant lease 1
|
||||
resp1 = client.post("/api/leases", json={
|
||||
"node_id": "local",
|
||||
"gpu_id": 0,
|
||||
"mb": 2048,
|
||||
"service": "peregrine",
|
||||
"priority": 2,
|
||||
})
|
||||
assert resp1.status_code == 200
|
||||
lease1_id = resp1.json()["lease"]["lease_id"]
|
||||
|
||||
# Grant lease 2
|
||||
resp2 = client.post("/api/leases", json={
|
||||
"node_id": "local",
|
||||
"gpu_id": 0,
|
||||
"mb": 2048,
|
||||
"service": "kiwi",
|
||||
"priority": 2,
|
||||
})
|
||||
assert resp2.status_code == 200
|
||||
lease2_id = resp2.json()["lease"]["lease_id"]
|
||||
|
||||
# Both should be in active leases
|
||||
resp = client.get("/api/leases")
|
||||
leases = resp.json()["leases"]
|
||||
lease_ids = [l["lease_id"] for l in leases]
|
||||
assert lease1_id in lease_ids
|
||||
assert lease2_id in lease_ids
|
||||
assert len(leases) == 2
|
||||
|
||||
# Release lease 1
|
||||
resp = client.delete(f"/api/leases/{lease1_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Only lease 2 should remain
|
||||
resp = client.get("/api/leases")
|
||||
leases = resp.json()["leases"]
|
||||
lease_ids = [l["lease_id"] for l in leases]
|
||||
assert lease1_id not in lease_ids
|
||||
assert lease2_id in lease_ids
|
||||
assert len(leases) == 1
|
||||
|
||||
|
||||
def test_delete_nonexistent_lease_returns_404(system):
|
||||
"""Test: deleting a nonexistent lease returns 404."""
|
||||
client, _ = system
|
||||
resp = client.delete("/api/leases/nonexistent-lease-id")
|
||||
assert resp.status_code == 404
|
||||
assert "not found" in resp.json()["detail"]
|
||||
|
||||
|
||||
def test_health_endpoint_returns_ok(system):
|
||||
"""Test: GET /api/health returns status ok."""
|
||||
client, _ = system
|
||||
resp = client.get("/api/health")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ok"
|
||||
85
tests/test_resources/test_lease_manager.py
Normal file
85
tests/test_resources/test_lease_manager.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
import pytest
|
||||
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mgr():
|
||||
m = LeaseManager()
|
||||
m.register_gpu(node_id="heimdall", gpu_id=0, total_mb=8192)
|
||||
return m
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_grant_succeeds_when_vram_available(mgr):
|
||||
lease = await mgr.try_grant(
|
||||
node_id="heimdall", gpu_id=0, mb=4096,
|
||||
service="peregrine", priority=1
|
||||
)
|
||||
assert lease is not None
|
||||
assert lease.mb_granted == 4096
|
||||
assert lease.node_id == "heimdall"
|
||||
assert lease.gpu_id == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_grant_fails_when_vram_insufficient(mgr):
|
||||
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
|
||||
service="vllm", priority=1)
|
||||
lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000,
|
||||
service="kiwi", priority=2)
|
||||
assert lease is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_release_frees_vram(mgr):
|
||||
lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
|
||||
service="vllm", priority=1)
|
||||
assert lease is not None
|
||||
released = await mgr.release(lease.lease_id)
|
||||
assert released is True
|
||||
lease2 = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000,
|
||||
service="comfyui", priority=4)
|
||||
assert lease2 is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_release_unknown_lease_returns_false(mgr):
|
||||
result = await mgr.release("nonexistent-id")
|
||||
assert result is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_eviction_candidates_returns_lower_priority_leases(mgr):
|
||||
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=3000,
|
||||
service="comfyui", priority=4)
|
||||
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000,
|
||||
service="ollama", priority=1)
|
||||
candidates = mgr.get_eviction_candidates(
|
||||
node_id="heimdall", gpu_id=0,
|
||||
needed_mb=3000, requester_priority=2
|
||||
)
|
||||
assert len(candidates) == 1
|
||||
assert candidates[0].holder_service == "comfyui"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_leases_for_gpu(mgr):
|
||||
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=1024,
|
||||
service="peregrine", priority=1)
|
||||
await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=512,
|
||||
service="kiwi", priority=2)
|
||||
leases = mgr.list_leases(node_id="heimdall", gpu_id=0)
|
||||
assert len(leases) == 2
|
||||
|
||||
|
||||
def test_register_gpu_sets_total(mgr):
|
||||
assert mgr.gpu_total_mb("heimdall", 0) == 8192
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_used_mb_tracks_grants():
|
||||
mgr = LeaseManager()
|
||||
mgr.register_gpu("heimdall", 0, 8192)
|
||||
await mgr.try_grant("heimdall", 0, 3000, "a", 1)
|
||||
await mgr.try_grant("heimdall", 0, 2000, "b", 2)
|
||||
assert mgr.used_mb("heimdall", 0) == 5000
|
||||
47
tests/test_resources/test_models.py
Normal file
47
tests/test_resources/test_models.py
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
import time
|
||||
import pytest
|
||||
from circuitforge_core.resources.models import VRAMLease, GpuInfo, NodeInfo
|
||||
|
||||
|
||||
def test_vram_lease_create_assigns_unique_ids():
|
||||
lease_a = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=4096,
|
||||
service="peregrine", priority=1)
|
||||
lease_b = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=4096,
|
||||
service="peregrine", priority=1)
|
||||
assert lease_a.lease_id != lease_b.lease_id
|
||||
|
||||
|
||||
def test_vram_lease_create_with_ttl_sets_expiry():
|
||||
before = time.time()
|
||||
lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=2048,
|
||||
service="kiwi", priority=2, ttl_s=60.0)
|
||||
after = time.time()
|
||||
assert before + 60.0 <= lease.expires_at <= after + 60.0
|
||||
|
||||
|
||||
def test_vram_lease_create_no_ttl_has_zero_expiry():
|
||||
lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=1024,
|
||||
service="snipe", priority=2)
|
||||
assert lease.expires_at == 0.0
|
||||
|
||||
|
||||
def test_vram_lease_is_immutable():
|
||||
lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=1024,
|
||||
service="snipe", priority=2)
|
||||
with pytest.raises((AttributeError, TypeError)):
|
||||
lease.mb_granted = 999 # type: ignore
|
||||
|
||||
|
||||
def test_gpu_info_fields():
|
||||
info = GpuInfo(gpu_id=0, name="RTX 4000", vram_total_mb=8192,
|
||||
vram_used_mb=2048, vram_free_mb=6144)
|
||||
assert info.vram_free_mb == 6144
|
||||
|
||||
|
||||
def test_node_info_fields():
|
||||
gpu = GpuInfo(gpu_id=0, name="RTX 4000", vram_total_mb=8192,
|
||||
vram_used_mb=0, vram_free_mb=8192)
|
||||
node = NodeInfo(node_id="heimdall", agent_url="http://localhost:7701",
|
||||
gpus=[gpu], last_heartbeat=time.time())
|
||||
assert node.node_id == "heimdall"
|
||||
assert len(node.gpus) == 1
|
||||
101
tests/test_resources/test_profile_registry.py
Normal file
101
tests/test_resources/test_profile_registry.py
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
# tests/test_resources/test_profile_registry.py
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from circuitforge_core.resources.profiles.schema import (
|
||||
GpuProfile, ServiceProfile, load_profile
|
||||
)
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
|
||||
|
||||
def test_load_8gb_profile(tmp_path):
|
||||
yaml_content = """
|
||||
schema_version: 1
|
||||
name: single-gpu-8gb
|
||||
vram_total_mb: 8192
|
||||
eviction_timeout_s: 10.0
|
||||
services:
|
||||
vllm:
|
||||
max_mb: 5120
|
||||
priority: 1
|
||||
cf-vision:
|
||||
max_mb: 2048
|
||||
priority: 2
|
||||
shared: true
|
||||
max_concurrent: 3
|
||||
"""
|
||||
profile_file = tmp_path / "test.yaml"
|
||||
profile_file.write_text(yaml_content)
|
||||
profile = load_profile(profile_file)
|
||||
|
||||
assert profile.name == "single-gpu-8gb"
|
||||
assert profile.schema_version == 1
|
||||
assert profile.vram_total_mb == 8192
|
||||
assert profile.eviction_timeout_s == 10.0
|
||||
assert "vllm" in profile.services
|
||||
assert profile.services["vllm"].max_mb == 5120
|
||||
assert profile.services["vllm"].priority == 1
|
||||
assert profile.services["cf-vision"].shared is True
|
||||
assert profile.services["cf-vision"].max_concurrent == 3
|
||||
|
||||
|
||||
def test_load_profile_rejects_wrong_schema_version(tmp_path):
|
||||
yaml_content = "schema_version: 99\nname: future\n"
|
||||
profile_file = tmp_path / "future.yaml"
|
||||
profile_file.write_text(yaml_content)
|
||||
with pytest.raises(ValueError, match="schema_version"):
|
||||
load_profile(profile_file)
|
||||
|
||||
|
||||
def test_service_profile_defaults():
|
||||
svc = ServiceProfile(max_mb=1024, priority=2)
|
||||
assert svc.shared is False
|
||||
assert svc.max_concurrent == 1
|
||||
assert svc.always_on is False
|
||||
assert svc.backend is None
|
||||
assert svc.consumers == []
|
||||
|
||||
|
||||
def test_profile_registry_loads_public_profiles():
|
||||
registry = ProfileRegistry()
|
||||
profiles = registry.list_public()
|
||||
names = [p.name for p in profiles]
|
||||
assert "single-gpu-8gb" in names
|
||||
assert "single-gpu-6gb" in names
|
||||
assert "single-gpu-2gb" in names
|
||||
|
||||
|
||||
def test_profile_registry_auto_detect_selects_8gb():
|
||||
registry = ProfileRegistry()
|
||||
mock_gpus = [
|
||||
MagicMock(vram_total_mb=8192),
|
||||
]
|
||||
profile = registry.auto_detect(mock_gpus)
|
||||
assert profile.name == "single-gpu-8gb"
|
||||
|
||||
|
||||
def test_profile_registry_auto_detect_selects_6gb():
|
||||
registry = ProfileRegistry()
|
||||
mock_gpus = [MagicMock(vram_total_mb=6144)]
|
||||
profile = registry.auto_detect(mock_gpus)
|
||||
assert profile.name == "single-gpu-6gb"
|
||||
|
||||
|
||||
def test_profile_registry_auto_detect_selects_2gb():
|
||||
registry = ProfileRegistry()
|
||||
mock_gpus = [MagicMock(vram_total_mb=2048)]
|
||||
profile = registry.auto_detect(mock_gpus)
|
||||
assert profile.name == "single-gpu-2gb"
|
||||
|
||||
|
||||
def test_profile_registry_load_from_path(tmp_path):
|
||||
yaml_content = (
|
||||
"schema_version: 1\nname: custom\n"
|
||||
"vram_total_mb: 12288\neviction_timeout_s: 5.0\n"
|
||||
)
|
||||
p = tmp_path / "custom.yaml"
|
||||
p.write_text(yaml_content)
|
||||
registry = ProfileRegistry()
|
||||
profile = registry.load(p)
|
||||
assert profile.name == "custom"
|
||||
assert profile.vram_total_mb == 12288
|
||||
Loading…
Reference in a new issue