diff --git a/.gitignore b/.gitignore index f5ba6f6..7d7f62b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,7 @@ __pycache__/ *.egg-info/ dist/ .pytest_cache/ +.superpowers/ + +# cf-orch private profiles (commit on personal/heimdall branch only) +circuitforge_core/resources/profiles/private/ diff --git a/circuitforge_core/resources/__init__.py b/circuitforge_core/resources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/circuitforge_core/resources/agent/__init__.py b/circuitforge_core/resources/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/circuitforge_core/resources/agent/app.py b/circuitforge_core/resources/agent/app.py new file mode 100644 index 0000000..820c79a --- /dev/null +++ b/circuitforge_core/resources/agent/app.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import logging +from typing import Any + +from fastapi import FastAPI +from pydantic import BaseModel + +from circuitforge_core.resources.agent.eviction_executor import EvictionExecutor +from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor + +logger = logging.getLogger(__name__) + + +class EvictRequest(BaseModel): + pid: int + grace_period_s: float = 5.0 + + +def create_agent_app( + node_id: str, + monitor: GpuMonitor | None = None, + executor: EvictionExecutor | None = None, +) -> FastAPI: + _monitor = monitor or GpuMonitor() + _executor = executor or EvictionExecutor() + + app = FastAPI(title=f"cf-orch-agent [{node_id}]") + + @app.get("/health") + def health() -> dict[str, Any]: + return {"status": "ok", "node_id": node_id} + + @app.get("/gpu-info") + def gpu_info() -> dict[str, Any]: + gpus = _monitor.poll() + return { + "node_id": node_id, + "gpus": [ + { + "gpu_id": g.gpu_id, + "name": g.name, + "vram_total_mb": g.vram_total_mb, + "vram_used_mb": g.vram_used_mb, + "vram_free_mb": g.vram_free_mb, + } + for g in gpus + ], + } + + @app.post("/evict") + def evict(req: EvictRequest) -> dict[str, Any]: + result = _executor.evict_pid(pid=req.pid, grace_period_s=req.grace_period_s) + return { + "success": result.success, + "method": result.method, + "message": result.message, + } + + return app diff --git a/circuitforge_core/resources/agent/eviction_executor.py b/circuitforge_core/resources/agent/eviction_executor.py new file mode 100644 index 0000000..d6a7c8a --- /dev/null +++ b/circuitforge_core/resources/agent/eviction_executor.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import logging +import os +import signal +import time +from dataclasses import dataclass + +import psutil + +logger = logging.getLogger(__name__) + +_DEFAULT_GRACE_S = 5.0 + + +@dataclass(frozen=True) +class EvictionResult: + success: bool + method: str # "sigterm", "sigkill", "already_gone", "not_found", "error" + message: str + + +class EvictionExecutor: + def __init__(self, grace_period_s: float = _DEFAULT_GRACE_S) -> None: + self._default_grace = grace_period_s + + def evict_pid( + self, + pid: int, + grace_period_s: float | None = None, + ) -> EvictionResult: + grace = grace_period_s if grace_period_s is not None else self._default_grace + + if pid <= 0: + return EvictionResult( + success=False, method="error", + message=f"Refusing to signal invalid PID {pid}" + ) + + if not psutil.pid_exists(pid): + return EvictionResult( + success=False, method="not_found", + message=f"PID {pid} not found" + ) + + try: + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + return EvictionResult( + success=True, method="already_gone", + message=f"PID {pid} vanished before SIGTERM" + ) + except PermissionError as exc: + return EvictionResult( + success=False, method="error", + message=f"Permission denied terminating PID {pid}: {exc}" + ) + + # Wait for grace period + deadline = time.monotonic() + grace + while time.monotonic() < deadline: + if not psutil.pid_exists(pid): + logger.info("PID %d exited cleanly after SIGTERM", pid) + return EvictionResult( + success=True, method="sigterm", + message=f"PID {pid} exited after SIGTERM" + ) + time.sleep(0.05) + + # Escalate to SIGKILL + if psutil.pid_exists(pid): + try: + os.kill(pid, signal.SIGKILL) + logger.warning("PID %d required SIGKILL", pid) + return EvictionResult( + success=True, method="sigkill", + message=f"PID {pid} killed with SIGKILL" + ) + except ProcessLookupError: + pass + + return EvictionResult( + success=True, method="sigkill", + message=f"PID {pid} is gone" + ) diff --git a/circuitforge_core/resources/agent/gpu_monitor.py b/circuitforge_core/resources/agent/gpu_monitor.py new file mode 100644 index 0000000..4d058d6 --- /dev/null +++ b/circuitforge_core/resources/agent/gpu_monitor.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import logging +import subprocess + +from circuitforge_core.resources.models import GpuInfo + +logger = logging.getLogger(__name__) + +_NVIDIA_SMI_CMD = [ + "nvidia-smi", + "--query-gpu=index,name,memory.total,memory.used,memory.free", + "--format=csv,noheader,nounits", +] + + +class GpuMonitor: + def poll(self) -> list[GpuInfo]: + try: + result = subprocess.run( + _NVIDIA_SMI_CMD, + capture_output=True, + text=True, + timeout=5, + ) + except (FileNotFoundError, subprocess.TimeoutExpired) as exc: + logger.warning("nvidia-smi unavailable: %s", exc) + return [] + + if result.returncode != 0: + logger.warning("nvidia-smi exited %d", result.returncode) + return [] + + return self._parse(result.stdout) + + def _parse(self, output: str) -> list[GpuInfo]: + gpus: list[GpuInfo] = [] + for line in output.strip().splitlines(): + parts = [p.strip() for p in line.split(",")] + if len(parts) != 5: + continue + try: + gpus.append(GpuInfo( + gpu_id=int(parts[0]), + name=parts[1], + vram_total_mb=int(parts[2]), + vram_used_mb=int(parts[3]), + vram_free_mb=int(parts[4]), + )) + except ValueError: + logger.debug("Skipping malformed nvidia-smi line: %r", line) + return gpus diff --git a/circuitforge_core/resources/cli.py b/circuitforge_core/resources/cli.py new file mode 100644 index 0000000..1e907a4 --- /dev/null +++ b/circuitforge_core/resources/cli.py @@ -0,0 +1,143 @@ +from __future__ import annotations + +import sys +from pathlib import Path +from typing import Annotated, Optional + +import typer +import uvicorn + +app = typer.Typer(name="cf-orch", help="CircuitForge GPU resource orchestrator") + +_SYSTEMD_UNIT_PATH = Path("/etc/systemd/system/cf-orch.service") + +_SYSTEMD_UNIT_TEMPLATE = """\ +[Unit] +Description=CircuitForge GPU Resource Orchestrator +After=network.target + +[Service] +Type=simple +ExecStart={python} -m circuitforge_core.resources.cli start +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=multi-user.target +""" + + +@app.command() +def start( + profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None, + host: str = "0.0.0.0", + port: int = 7700, + agent_port: int = 7701, +) -> None: + """Start the cf-orch coordinator (auto-detects GPU profile if not specified).""" + from circuitforge_core.resources.coordinator.lease_manager import LeaseManager + from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor + from circuitforge_core.resources.coordinator.app import create_coordinator_app + from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor + + lease_manager = LeaseManager() + profile_registry = ProfileRegistry() + supervisor = AgentSupervisor(lease_manager=lease_manager) + + monitor = GpuMonitor() + gpus = monitor.poll() + if not gpus: + typer.echo( + "Warning: no GPUs detected via nvidia-smi — coordinator running with 0 VRAM" + ) + else: + for gpu in gpus: + lease_manager.register_gpu("local", gpu.gpu_id, gpu.vram_total_mb) + typer.echo(f"Detected {len(gpus)} GPU(s)") + + if profile: + active_profile = profile_registry.load(profile) + typer.echo(f"Using profile: {active_profile.name} (from {profile})") + else: + active_profile = ( + profile_registry.auto_detect(gpus) + if gpus + else profile_registry.list_public()[-1] + ) + typer.echo(f"Auto-selected profile: {active_profile.name}") + + coordinator_app = create_coordinator_app( + lease_manager=lease_manager, + profile_registry=profile_registry, + agent_supervisor=supervisor, + ) + + typer.echo(f"Starting cf-orch coordinator on {host}:{port}") + uvicorn.run(coordinator_app, host=host, port=port) + + +@app.command() +def agent( + coordinator: str = "http://localhost:7700", + node_id: str = "local", + host: str = "0.0.0.0", + port: int = 7701, +) -> None: + """Start a cf-orch node agent (for remote nodes like Navi, Huginn).""" + from circuitforge_core.resources.agent.app import create_agent_app + + agent_app = create_agent_app(node_id=node_id) + typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}") + uvicorn.run(agent_app, host=host, port=port) + + +@app.command() +def status(coordinator: str = "http://localhost:7700") -> None: + """Show GPU and lease status from the coordinator.""" + import httpx + + try: + resp = httpx.get(f"{coordinator}/api/nodes", timeout=5.0) + resp.raise_for_status() + nodes = resp.json().get("nodes", []) + for node in nodes: + typer.echo(f"\nNode: {node['node_id']}") + for gpu in node.get("gpus", []): + typer.echo( + f" GPU {gpu['gpu_id']}: {gpu['name']} — " + f"{gpu['vram_used_mb']}/{gpu['vram_total_mb']} MB used" + ) + except Exception as exc: + typer.echo(f"Coordinator unreachable at {coordinator}: {exc}", err=True) + raise typer.Exit(1) + + +@app.command("install-service") +def install_service( + dry_run: bool = typer.Option( + False, "--dry-run", help="Print unit file without writing" + ), +) -> None: + """Write a systemd unit file for cf-orch (requires root).""" + python = sys.executable + unit_content = _SYSTEMD_UNIT_TEMPLATE.format(python=python) + if dry_run: + typer.echo(f"Would write to {_SYSTEMD_UNIT_PATH}:\n") + typer.echo(unit_content) + return + try: + _SYSTEMD_UNIT_PATH.write_text(unit_content) + typer.echo(f"Written: {_SYSTEMD_UNIT_PATH}") + typer.echo( + "Run: sudo systemctl daemon-reload && sudo systemctl enable --now cf-orch" + ) + except PermissionError: + typer.echo( + f"Permission denied writing to {_SYSTEMD_UNIT_PATH}. Run as root.", err=True + ) + raise typer.Exit(1) + + +if __name__ == "__main__": + app() diff --git a/circuitforge_core/resources/compose.yml b/circuitforge_core/resources/compose.yml new file mode 100644 index 0000000..2cb4345 --- /dev/null +++ b/circuitforge_core/resources/compose.yml @@ -0,0 +1,44 @@ +# circuitforge_core/resources/compose.yml +# One-command cf-orch deployment for Docker self-hosters: +# docker compose -f path/to/compose.yml up cf-orch-coordinator + +services: + cf-orch-coordinator: + image: python:3.12-slim + command: > + sh -c "pip install 'circuitforge-core[orch]' && + cf-orch start --host 0.0.0.0 --port 7700" + ports: + - "7700:7700" + volumes: + - /run/docker.sock:/var/run/docker.sock:ro + - cf-orch-data:/data + environment: + - CFORCH_PROFILE=${CFORCH_PROFILE:-} + restart: unless-stopped + devices: + - /dev/nvidia0:/dev/nvidia0 + - /dev/nvidiactl:/dev/nvidiactl + runtime: nvidia + + cf-orch-agent: + image: python:3.12-slim + command: > + sh -c "pip install 'circuitforge-core[orch]' && + cf-orch agent --coordinator http://cf-orch-coordinator:7700 + --node-id ${CFORCH_NODE_ID:-local} + --host 0.0.0.0 --port 7701" + ports: + - "7701:7701" + depends_on: + - cf-orch-coordinator + environment: + - CFORCH_NODE_ID=${CFORCH_NODE_ID:-local} + restart: unless-stopped + devices: + - /dev/nvidia0:/dev/nvidia0 + - /dev/nvidiactl:/dev/nvidiactl + runtime: nvidia + +volumes: + cf-orch-data: diff --git a/circuitforge_core/resources/coordinator/__init__.py b/circuitforge_core/resources/coordinator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py new file mode 100644 index 0000000..63d92e7 --- /dev/null +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import asyncio +import logging +import time +from dataclasses import dataclass, field + +import httpx + +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.models import GpuInfo, NodeInfo + +logger = logging.getLogger(__name__) + +_HEARTBEAT_INTERVAL_S = 10.0 +_AGENT_TIMEOUT_S = 5.0 + + +@dataclass +class AgentRecord: + node_id: str + agent_url: str + last_seen: float = field(default_factory=time.time) + gpus: list[GpuInfo] = field(default_factory=list) + online: bool = False + + +class AgentSupervisor: + def __init__(self, lease_manager: LeaseManager) -> None: + self._agents: dict[str, AgentRecord] = {} + self._lease_manager = lease_manager + self._running = False + + def register(self, node_id: str, agent_url: str) -> None: + if node_id not in self._agents: + self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url) + logger.info("Registered agent node: %s @ %s", node_id, agent_url) + + def get_node_info(self, node_id: str) -> NodeInfo | None: + record = self._agents.get(node_id) + if record is None: + return None + return NodeInfo( + node_id=record.node_id, + agent_url=record.agent_url, + gpus=record.gpus, + last_heartbeat=record.last_seen, + ) + + def all_nodes(self) -> list[NodeInfo]: + return [ + NodeInfo( + node_id=r.node_id, + agent_url=r.agent_url, + gpus=r.gpus, + last_heartbeat=r.last_seen, + ) + for r in self._agents.values() + ] + + async def poll_agent(self, node_id: str) -> bool: + record = self._agents.get(node_id) + if record is None: + return False + try: + async with httpx.AsyncClient(timeout=_AGENT_TIMEOUT_S) as client: + resp = await client.get(f"{record.agent_url}/gpu-info") + resp.raise_for_status() + data = resp.json() + gpus = [ + GpuInfo( + gpu_id=g["gpu_id"], + name=g["name"], + vram_total_mb=g["vram_total_mb"], + vram_used_mb=g["vram_used_mb"], + vram_free_mb=g["vram_free_mb"], + ) + for g in data.get("gpus", []) + ] + record.gpus = gpus + record.last_seen = time.time() + record.online = True + for gpu in gpus: + self._lease_manager.register_gpu(node_id, gpu.gpu_id, gpu.vram_total_mb) + return True + except Exception as exc: + logger.warning("Agent %s unreachable: %s", node_id, exc) + record.online = False + return False + + async def poll_all(self) -> None: + await asyncio.gather(*[self.poll_agent(nid) for nid in self._agents]) + + async def run_heartbeat_loop(self) -> None: + self._running = True + while self._running: + await self.poll_all() + await asyncio.sleep(_HEARTBEAT_INTERVAL_S) + + def stop(self) -> None: + self._running = False diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py new file mode 100644 index 0000000..c25d061 --- /dev/null +++ b/circuitforge_core/resources/coordinator/app.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +from typing import Any + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor +from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + + +class LeaseRequest(BaseModel): + node_id: str + gpu_id: int + mb: int + service: str + priority: int = 2 + ttl_s: float = 0.0 + + +def create_coordinator_app( + lease_manager: LeaseManager, + profile_registry: ProfileRegistry, + agent_supervisor: AgentSupervisor, +) -> FastAPI: + eviction_engine = EvictionEngine(lease_manager=lease_manager) + + app = FastAPI(title="cf-orch-coordinator") + + @app.get("/api/health") + def health() -> dict[str, Any]: + return {"status": "ok"} + + @app.get("/api/nodes") + def get_nodes() -> dict[str, Any]: + nodes = agent_supervisor.all_nodes() + return { + "nodes": [ + { + "node_id": n.node_id, + "agent_url": n.agent_url, + "last_heartbeat": n.last_heartbeat, + "gpus": [ + { + "gpu_id": g.gpu_id, + "name": g.name, + "vram_total_mb": g.vram_total_mb, + "vram_used_mb": g.vram_used_mb, + "vram_free_mb": g.vram_free_mb, + } + for g in n.gpus + ], + } + for n in nodes + ] + } + + @app.get("/api/profiles") + def get_profiles() -> dict[str, Any]: + return { + "profiles": [ + {"name": p.name, "vram_total_mb": p.vram_total_mb} + for p in profile_registry.list_public() + ] + } + + @app.get("/api/leases") + def get_leases() -> dict[str, Any]: + return { + "leases": [ + { + "lease_id": lease.lease_id, + "node_id": lease.node_id, + "gpu_id": lease.gpu_id, + "mb_granted": lease.mb_granted, + "holder_service": lease.holder_service, + "priority": lease.priority, + "expires_at": lease.expires_at, + } + for lease in lease_manager.all_leases() + ] + } + + @app.post("/api/leases") + async def request_lease(req: LeaseRequest) -> dict[str, Any]: + node_info = agent_supervisor.get_node_info(req.node_id) + if node_info is None: + raise HTTPException( + status_code=422, + detail=f"Unknown node_id {req.node_id!r} — node not registered", + ) + agent_url = node_info.agent_url + + lease = await eviction_engine.request_lease( + node_id=req.node_id, + gpu_id=req.gpu_id, + mb=req.mb, + service=req.service, + priority=req.priority, + agent_url=agent_url, + ttl_s=req.ttl_s, + ) + if lease is None: + raise HTTPException( + status_code=503, + detail="Insufficient VRAM — no eviction candidates available", + ) + return { + "lease": { + "lease_id": lease.lease_id, + "node_id": lease.node_id, + "gpu_id": lease.gpu_id, + "mb_granted": lease.mb_granted, + "holder_service": lease.holder_service, + "priority": lease.priority, + "expires_at": lease.expires_at, + } + } + + @app.delete("/api/leases/{lease_id}") + async def release_lease(lease_id: str) -> dict[str, Any]: + released = await lease_manager.release(lease_id) + if not released: + raise HTTPException(status_code=404, detail=f"Lease {lease_id!r} not found") + return {"released": True, "lease_id": lease_id} + + return app diff --git a/circuitforge_core/resources/coordinator/eviction_engine.py b/circuitforge_core/resources/coordinator/eviction_engine.py new file mode 100644 index 0000000..db85774 --- /dev/null +++ b/circuitforge_core/resources/coordinator/eviction_engine.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import asyncio +import logging + +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.models import VRAMLease + +logger = logging.getLogger(__name__) + +_DEFAULT_EVICTION_TIMEOUT_S = 10.0 + + +class EvictionEngine: + def __init__( + self, + lease_manager: LeaseManager, + eviction_timeout_s: float = _DEFAULT_EVICTION_TIMEOUT_S, + ) -> None: + self.lease_manager = lease_manager + self._timeout = eviction_timeout_s + + async def request_lease( + self, + node_id: str, + gpu_id: int, + mb: int, + service: str, + priority: int, + agent_url: str, + ttl_s: float = 0.0, + ) -> VRAMLease | None: + # Fast path: enough free VRAM + lease = await self.lease_manager.try_grant( + node_id, gpu_id, mb, service, priority, ttl_s + ) + if lease is not None: + return lease + + # Find eviction candidates + candidates = self.lease_manager.get_eviction_candidates( + node_id=node_id, gpu_id=gpu_id, + needed_mb=mb, requester_priority=priority, + ) + if not candidates: + logger.info( + "No eviction candidates for %s on %s:GPU%d (%dMB needed)", + service, node_id, gpu_id, mb, + ) + return None + + # Evict candidates + freed_mb = sum(c.mb_granted for c in candidates) + logger.info( + "Evicting %d lease(s) to free %dMB for %s", + len(candidates), freed_mb, service, + ) + for candidate in candidates: + await self._evict_lease(candidate, agent_url) + + # Wait for evictions to free up VRAM (poll with timeout) + loop = asyncio.get_running_loop() + deadline = loop.time() + self._timeout + while loop.time() < deadline: + lease = await self.lease_manager.try_grant( + node_id, gpu_id, mb, service, priority, ttl_s + ) + if lease is not None: + return lease + await asyncio.sleep(0.1) + + logger.warning("Eviction timed out for %s after %.1fs", service, self._timeout) + return None + + async def _evict_lease(self, lease: VRAMLease, agent_url: str) -> None: + """Release lease accounting. Process-level eviction deferred to Plan B.""" + await self.lease_manager.release(lease.lease_id) + + async def _call_agent_evict(self, agent_url: str, lease: VRAMLease) -> bool: + """POST /evict to the agent. Stub for v1 — real process lookup in Plan B.""" + return True diff --git a/circuitforge_core/resources/coordinator/lease_manager.py b/circuitforge_core/resources/coordinator/lease_manager.py new file mode 100644 index 0000000..652b64b --- /dev/null +++ b/circuitforge_core/resources/coordinator/lease_manager.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import asyncio +from collections import defaultdict + +from circuitforge_core.resources.models import VRAMLease + + +class LeaseManager: + def __init__(self) -> None: + self._leases: dict[str, VRAMLease] = {} + self._gpu_total: dict[tuple[str, int], int] = {} + self._gpu_used: dict[tuple[str, int], int] = defaultdict(int) + self._lock = asyncio.Lock() + + def register_gpu(self, node_id: str, gpu_id: int, total_mb: int) -> None: + self._gpu_total[(node_id, gpu_id)] = total_mb + + def gpu_total_mb(self, node_id: str, gpu_id: int) -> int: + return self._gpu_total.get((node_id, gpu_id), 0) + + def used_mb(self, node_id: str, gpu_id: int) -> int: + return self._gpu_used[(node_id, gpu_id)] + + async def try_grant( + self, + node_id: str, + gpu_id: int, + mb: int, + service: str, + priority: int, + ttl_s: float = 0.0, + ) -> VRAMLease | None: + async with self._lock: + total = self._gpu_total.get((node_id, gpu_id), 0) + used = self._gpu_used[(node_id, gpu_id)] + if total - used < mb: + return None + lease = VRAMLease.create( + gpu_id=gpu_id, node_id=node_id, mb=mb, + service=service, priority=priority, ttl_s=ttl_s, + ) + self._leases[lease.lease_id] = lease + self._gpu_used[(node_id, gpu_id)] += mb + return lease + + async def release(self, lease_id: str) -> bool: + async with self._lock: + lease = self._leases.pop(lease_id, None) + if lease is None: + return False + self._gpu_used[(lease.node_id, lease.gpu_id)] -= lease.mb_granted + return True + + def get_eviction_candidates( + self, + node_id: str, + gpu_id: int, + needed_mb: int, + requester_priority: int, + ) -> list[VRAMLease]: + candidates = [ + lease for lease in self._leases.values() + if lease.node_id == node_id + and lease.gpu_id == gpu_id + and lease.priority > requester_priority + ] + candidates.sort(key=lambda lease: lease.priority, reverse=True) + selected: list[VRAMLease] = [] + freed = 0 + for candidate in candidates: + selected.append(candidate) + freed += candidate.mb_granted + if freed >= needed_mb: + break + return selected + + def list_leases( + self, node_id: str | None = None, gpu_id: int | None = None + ) -> list[VRAMLease]: + return [ + lease for lease in self._leases.values() + if (node_id is None or lease.node_id == node_id) + and (gpu_id is None or lease.gpu_id == gpu_id) + ] + + def all_leases(self) -> list[VRAMLease]: + return list(self._leases.values()) diff --git a/circuitforge_core/resources/coordinator/profile_registry.py b/circuitforge_core/resources/coordinator/profile_registry.py new file mode 100644 index 0000000..0310c44 --- /dev/null +++ b/circuitforge_core/resources/coordinator/profile_registry.py @@ -0,0 +1,65 @@ +# circuitforge_core/resources/coordinator/profile_registry.py +from __future__ import annotations + +import logging +from pathlib import Path + +from circuitforge_core.resources.models import GpuInfo +from circuitforge_core.resources.profiles.schema import GpuProfile, load_profile + +_PUBLIC_DIR = Path(__file__).parent.parent / "profiles" / "public" + +# VRAM thresholds for public profile selection (MB) +_PROFILE_THRESHOLDS = [ + (22000, "single-gpu-24gb"), + (14000, "single-gpu-16gb"), + (8000, "single-gpu-8gb"), + (5500, "single-gpu-6gb"), + (3500, "single-gpu-4gb"), + (0, "single-gpu-2gb"), +] + +_log = logging.getLogger(__name__) + + +class ProfileRegistry: + def __init__(self, extra_dirs: list[Path] | None = None) -> None: + self._profiles: dict[str, GpuProfile] = {} + self._load_dir(_PUBLIC_DIR) + for d in (extra_dirs or []): + if d.exists(): + self._load_dir(d) + + def _load_dir(self, directory: Path) -> None: + for yaml_file in directory.glob("*.yaml"): + try: + profile = load_profile(yaml_file) + self._profiles[profile.name] = profile + except Exception as exc: + _log.warning("Skipping %s: %s", yaml_file, exc) + + def load(self, path: Path) -> GpuProfile: + profile = load_profile(path) + self._profiles[profile.name] = profile + return profile + + def list_public(self) -> list[GpuProfile]: + # CPU profiles (cpu-*) are intentionally excluded — this endpoint + # is used to match GPU hardware. CPU inference nodes self-select + # their profile via the CLI and are not listed for lease matching. + return [ + p for p in self._profiles.values() + if p.name.startswith("single-gpu-") + ] + + def get(self, name: str) -> GpuProfile | None: + return self._profiles.get(name) + + def auto_detect(self, gpus: list[GpuInfo]) -> GpuProfile: + primary_vram = gpus[0].vram_total_mb if gpus else 0 + for threshold_mb, profile_name in _PROFILE_THRESHOLDS: + if primary_vram >= threshold_mb: + profile = self._profiles.get(profile_name) + if profile: + return profile + return self._profiles["single-gpu-2gb"] diff --git a/circuitforge_core/resources/models.py b/circuitforge_core/resources/models.py new file mode 100644 index 0000000..53b4886 --- /dev/null +++ b/circuitforge_core/resources/models.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import time +import uuid +from dataclasses import dataclass, field + + +@dataclass(frozen=True) +class VRAMLease: + lease_id: str + gpu_id: int + node_id: str + mb_granted: int + holder_service: str + priority: int + expires_at: float # unix timestamp; 0.0 = no expiry + + @classmethod + def create( + cls, + gpu_id: int, + node_id: str, + mb: int, + service: str, + priority: int, + ttl_s: float = 0.0, + ) -> VRAMLease: + return cls( + lease_id=str(uuid.uuid4()), + gpu_id=gpu_id, + node_id=node_id, + mb_granted=mb, + holder_service=service, + priority=priority, + expires_at=time.time() + ttl_s if ttl_s > 0.0 else 0.0, + ) + + def is_expired(self) -> bool: + return self.expires_at > 0.0 and time.time() > self.expires_at + + +@dataclass(frozen=True) +class GpuInfo: + gpu_id: int + name: str + vram_total_mb: int + vram_used_mb: int + vram_free_mb: int + + +@dataclass +class NodeInfo: + node_id: str + agent_url: str + gpus: list[GpuInfo] + last_heartbeat: float = field(default_factory=time.time) diff --git a/circuitforge_core/resources/profiles/__init__.py b/circuitforge_core/resources/profiles/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/circuitforge_core/resources/profiles/public/cpu-16gb.yaml b/circuitforge_core/resources/profiles/public/cpu-16gb.yaml new file mode 100644 index 0000000..75a22e8 --- /dev/null +++ b/circuitforge_core/resources/profiles/public/cpu-16gb.yaml @@ -0,0 +1,33 @@ +schema_version: 1 +name: cpu-16gb +eviction_timeout_s: 30.0 +services: + ollama: + max_mb: 0 + priority: 1 + cf-stt: + max_mb: 0 + priority: 2 + shared: true + max_concurrent: 1 + backend: moonshine + cf-tts: + max_mb: 0 + priority: 2 + shared: true + max_concurrent: 1 + cf-embed: + max_mb: 0 + priority: 2 + shared: true + max_concurrent: 2 + always_on: true + cf-classify: + max_mb: 0 + priority: 2 + shared: true + max_concurrent: 2 + always_on: true +model_size_hints: + llm_max_params: 3b-q4 + image_gen_max: none diff --git a/circuitforge_core/resources/profiles/public/cpu-32gb.yaml b/circuitforge_core/resources/profiles/public/cpu-32gb.yaml new file mode 100644 index 0000000..fb7c976 --- /dev/null +++ b/circuitforge_core/resources/profiles/public/cpu-32gb.yaml @@ -0,0 +1,33 @@ +schema_version: 1 +name: cpu-32gb +eviction_timeout_s: 30.0 +services: + ollama: + max_mb: 0 + priority: 1 + cf-stt: + max_mb: 0 + priority: 2 + shared: true + max_concurrent: 2 + backend: faster-whisper + cf-tts: + max_mb: 0 + priority: 2 + shared: true + max_concurrent: 2 + cf-embed: + max_mb: 0 + priority: 2 + shared: true + max_concurrent: 4 + always_on: true + cf-classify: + max_mb: 0 + priority: 2 + shared: true + max_concurrent: 4 + always_on: true +model_size_hints: + llm_max_params: 7b-q4 + image_gen_max: none diff --git a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml new file mode 100644 index 0000000..7ad59f9 --- /dev/null +++ b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml @@ -0,0 +1,45 @@ +schema_version: 1 +name: single-gpu-16gb +vram_total_mb: 16384 +eviction_timeout_s: 10.0 +services: + vllm: + max_mb: 12288 + priority: 1 + ollama: + max_mb: 12288 + priority: 1 + cf-vision: + max_mb: 3072 + priority: 2 + shared: true + max_concurrent: 4 + cf-stt: + max_mb: 1200 + priority: 2 + shared: true + max_concurrent: 3 + backend: parakeet-tdt + cf-tts: + max_mb: 1024 + priority: 2 + shared: true + max_concurrent: 3 + cf-embed: + max_mb: 512 + priority: 2 + shared: true + max_concurrent: 6 + always_on: true + cf-classify: + max_mb: 512 + priority: 2 + shared: true + max_concurrent: 6 + always_on: true + comfyui: + max_mb: 14336 + priority: 4 +model_size_hints: + llm_max_params: 34b + image_gen_max: flux-dev-fp8 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml new file mode 100644 index 0000000..4f98eb8 --- /dev/null +++ b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml @@ -0,0 +1,45 @@ +schema_version: 1 +name: single-gpu-24gb +vram_total_mb: 24576 +eviction_timeout_s: 10.0 +services: + vllm: + max_mb: 20480 + priority: 1 + ollama: + max_mb: 18432 + priority: 1 + cf-vision: + max_mb: 4096 + priority: 2 + shared: true + max_concurrent: 6 + cf-stt: + max_mb: 1200 + priority: 2 + shared: true + max_concurrent: 4 + backend: parakeet-tdt + cf-tts: + max_mb: 1024 + priority: 2 + shared: true + max_concurrent: 4 + cf-embed: + max_mb: 512 + priority: 2 + shared: true + max_concurrent: 8 + always_on: true + cf-classify: + max_mb: 512 + priority: 2 + shared: true + max_concurrent: 8 + always_on: true + comfyui: + max_mb: 20480 + priority: 4 +model_size_hints: + llm_max_params: 70b + image_gen_max: flux-dev-fp16 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml new file mode 100644 index 0000000..d852eea --- /dev/null +++ b/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml @@ -0,0 +1,22 @@ +schema_version: 1 +name: single-gpu-2gb +vram_total_mb: 2048 +eviction_timeout_s: 15.0 +services: + ollama: + max_mb: 1536 + priority: 1 + cf-vision: + max_mb: 512 + priority: 2 + shared: true + max_concurrent: 1 + cf-stt: + max_mb: 200 + priority: 2 + shared: true + max_concurrent: 1 + backend: moonshine +model_size_hints: + llm_max_params: 3b + image_gen_max: none diff --git a/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml new file mode 100644 index 0000000..b7cb24e --- /dev/null +++ b/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml @@ -0,0 +1,30 @@ +schema_version: 1 +name: single-gpu-4gb +vram_total_mb: 4096 +eviction_timeout_s: 15.0 +services: + ollama: + max_mb: 3072 + priority: 1 + cf-vision: + max_mb: 1024 + priority: 2 + shared: true + max_concurrent: 1 + cf-stt: + max_mb: 600 + priority: 2 + shared: true + max_concurrent: 1 + backend: faster-whisper + cf-tts: + max_mb: 512 + priority: 2 + shared: true + max_concurrent: 1 + comfyui: + max_mb: 3584 + priority: 4 +model_size_hints: + llm_max_params: 3b + image_gen_max: sd15-fp8 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml new file mode 100644 index 0000000..92168ef --- /dev/null +++ b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml @@ -0,0 +1,33 @@ +schema_version: 1 +name: single-gpu-6gb +vram_total_mb: 6144 +eviction_timeout_s: 10.0 +services: + vllm: + max_mb: 4096 + priority: 1 + ollama: + max_mb: 3584 + priority: 1 + cf-vision: + max_mb: 1536 + priority: 2 + shared: true + max_concurrent: 2 + cf-stt: + max_mb: 600 + priority: 2 + shared: true + max_concurrent: 2 + backend: faster-whisper + cf-tts: + max_mb: 768 + priority: 2 + shared: true + max_concurrent: 1 + comfyui: + max_mb: 5120 + priority: 4 +model_size_hints: + llm_max_params: 7b + image_gen_max: sd15 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml new file mode 100644 index 0000000..7053419 --- /dev/null +++ b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml @@ -0,0 +1,33 @@ +schema_version: 1 +name: single-gpu-8gb +vram_total_mb: 8192 +eviction_timeout_s: 10.0 +services: + vllm: + max_mb: 5120 + priority: 1 + ollama: + max_mb: 4096 + priority: 1 + cf-vision: + max_mb: 2048 + priority: 2 + shared: true + max_concurrent: 3 + cf-stt: + max_mb: 1200 + priority: 2 + shared: true + max_concurrent: 2 + backend: parakeet-tdt + cf-tts: + max_mb: 1024 + priority: 2 + shared: true + max_concurrent: 2 + comfyui: + max_mb: 6144 + priority: 4 +model_size_hints: + llm_max_params: 8b + image_gen_max: sdxl-fp8 diff --git a/circuitforge_core/resources/profiles/schema.py b/circuitforge_core/resources/profiles/schema.py new file mode 100644 index 0000000..ac59020 --- /dev/null +++ b/circuitforge_core/resources/profiles/schema.py @@ -0,0 +1,66 @@ +# circuitforge_core/resources/profiles/schema.py +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import yaml +from pydantic import BaseModel, Field + +SUPPORTED_SCHEMA_VERSION = 1 + + +class ServiceProfile(BaseModel): + max_mb: int + priority: int + shared: bool = False + max_concurrent: int = 1 + always_on: bool = False + backend: str | None = None + consumers: list[str] = Field(default_factory=list) + + model_config = {"frozen": True} + + +class GpuNodeEntry(BaseModel): + id: int + vram_mb: int + role: str + card: str = "unknown" + always_on: bool = False + services: list[str] = Field(default_factory=list) + + model_config = {"frozen": True} + + +class NodeProfile(BaseModel): + gpus: list[GpuNodeEntry] + agent_url: str | None = None + nas_mount: str | None = None + + model_config = {"frozen": True} + + +class GpuProfile(BaseModel): + schema_version: int + name: str + vram_total_mb: int | None = None + eviction_timeout_s: float = 10.0 + services: dict[str, ServiceProfile] = Field(default_factory=dict) + model_size_hints: dict[str, str] = Field(default_factory=dict) + nodes: dict[str, NodeProfile] = Field(default_factory=dict) + + model_config = {"frozen": True} + + +def load_profile(path: Path) -> GpuProfile: + raw: dict[str, Any] = yaml.safe_load(path.read_text()) + if not isinstance(raw, dict): + raise ValueError(f"Profile file {path} must be a YAML mapping, got {type(raw).__name__}") + version = raw.get("schema_version") + if version != SUPPORTED_SCHEMA_VERSION: + raise ValueError( + f"Unsupported schema_version {version!r} in {path}. " + f"Expected {SUPPORTED_SCHEMA_VERSION}." + ) + return GpuProfile.model_validate(raw) diff --git a/pyproject.toml b/pyproject.toml index 7215dcb..bbc3026 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "circuitforge-core" -version = "0.1.0" +version = "0.2.0" description = "Shared scaffold for CircuitForge products" requires-python = ">=3.11" dependencies = [ @@ -13,9 +13,29 @@ dependencies = [ "openai>=1.0", ] +[project.optional-dependencies] +orch = [ + "fastapi>=0.110", + "uvicorn[standard]>=0.29", + "httpx>=0.27", + "pydantic>=2.0", + "typer[all]>=0.12", + "psutil>=5.9", +] +dev = [ + "circuitforge-core[orch]", + "pytest>=8.0", + "pytest-asyncio>=0.23", + "httpx>=0.27", +] + +[project.scripts] +cf-orch = "circuitforge_core.resources.cli:app" + [tool.setuptools.packages.find] where = ["."] include = ["circuitforge_core*"] [tool.pytest.ini_options] testpaths = ["tests"] +asyncio_mode = "auto" diff --git a/tests/test_resources/__init__.py b/tests/test_resources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_resources/test_agent_app.py b/tests/test_resources/test_agent_app.py new file mode 100644 index 0000000..b24c1aa --- /dev/null +++ b/tests/test_resources/test_agent_app.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import pytest +from unittest.mock import MagicMock +from fastapi.testclient import TestClient + +from circuitforge_core.resources.agent.app import create_agent_app +from circuitforge_core.resources.models import GpuInfo +from circuitforge_core.resources.agent.eviction_executor import EvictionResult + +MOCK_GPUS = [ + GpuInfo( + gpu_id=0, + name="RTX 4000", + vram_total_mb=8192, + vram_used_mb=1024, + vram_free_mb=7168, + ), +] + + +@pytest.fixture +def agent_client(): + mock_monitor = MagicMock() + mock_monitor.poll.return_value = MOCK_GPUS + mock_executor = MagicMock() + app = create_agent_app( + node_id="heimdall", + monitor=mock_monitor, + executor=mock_executor, + ) + return TestClient(app), mock_monitor, mock_executor + + +def test_health_returns_ok(agent_client): + client, _, _ = agent_client + resp = client.get("/health") + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" + assert resp.json()["node_id"] == "heimdall" + + +def test_gpu_info_returns_gpu_list(agent_client): + client, _, _ = agent_client + resp = client.get("/gpu-info") + assert resp.status_code == 200 + data = resp.json() + assert len(data["gpus"]) == 1 + assert data["gpus"][0]["gpu_id"] == 0 + assert data["gpus"][0]["name"] == "RTX 4000" + assert data["gpus"][0]["vram_free_mb"] == 7168 + + +def test_evict_calls_executor(agent_client): + client, _, mock_executor = agent_client + mock_executor.evict_pid.return_value = EvictionResult( + success=True, method="sigterm", message="done" + ) + resp = client.post("/evict", json={"pid": 1234, "grace_period_s": 5.0}) + assert resp.status_code == 200 + assert resp.json()["success"] is True + mock_executor.evict_pid.assert_called_once_with(pid=1234, grace_period_s=5.0) + + +def test_evict_requires_pid(agent_client): + client, _, _ = agent_client + resp = client.post("/evict", json={"grace_period_s": 5.0}) + assert resp.status_code == 422 diff --git a/tests/test_resources/test_cli.py b/tests/test_resources/test_cli.py new file mode 100644 index 0000000..5ceb715 --- /dev/null +++ b/tests/test_resources/test_cli.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +from typer.testing import CliRunner + +from circuitforge_core.resources.cli import app + +runner = CliRunner() + + +def test_cli_help(): + result = runner.invoke(app, ["--help"]) + assert result.exit_code == 0 + assert "cf-orch" in result.output.lower() or "Usage" in result.output + + +def test_status_command_shows_no_coordinator_message(): + with patch("httpx.get", side_effect=ConnectionRefusedError("refused")): + result = runner.invoke(app, ["status"]) + assert result.exit_code != 0 or "unreachable" in result.output.lower() \ + or "coordinator" in result.output.lower() + + +def test_install_service_creates_systemd_unit(tmp_path: Path): + unit_path = tmp_path / "cf-orch.service" + with patch( + "circuitforge_core.resources.cli._SYSTEMD_UNIT_PATH", unit_path + ): + result = runner.invoke(app, ["install-service", "--dry-run"]) + assert result.exit_code == 0 + assert "cf-orch.service" in result.output or "systemd" in result.output.lower() diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py new file mode 100644 index 0000000..1fb8ce3 --- /dev/null +++ b/tests/test_resources/test_coordinator_app.py @@ -0,0 +1,102 @@ +import pytest +from unittest.mock import MagicMock +from fastapi.testclient import TestClient +from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.models import GpuInfo, NodeInfo + + +@pytest.fixture +def coordinator_client(): + lease_manager = LeaseManager() + lease_manager.register_gpu("heimdall", 0, 8192) + profile_registry = ProfileRegistry() + supervisor = MagicMock() + supervisor.all_nodes.return_value = [ + NodeInfo( + node_id="heimdall", + agent_url="http://localhost:7701", + gpus=[GpuInfo(gpu_id=0, name="RTX 4000", + vram_total_mb=8192, vram_used_mb=0, vram_free_mb=8192)], + last_heartbeat=0.0, + ) + ] + supervisor.get_node_info.return_value = NodeInfo( + node_id="heimdall", + agent_url="http://localhost:7701", + gpus=[], + last_heartbeat=0.0, + ) + app = create_coordinator_app( + lease_manager=lease_manager, + profile_registry=profile_registry, + agent_supervisor=supervisor, + ) + return TestClient(app), lease_manager + + +def test_health_returns_ok(coordinator_client): + client, _ = coordinator_client + resp = client.get("/api/health") + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" + + +def test_get_nodes_returns_list(coordinator_client): + client, _ = coordinator_client + resp = client.get("/api/nodes") + assert resp.status_code == 200 + nodes = resp.json()["nodes"] + assert len(nodes) == 1 + assert nodes[0]["node_id"] == "heimdall" + + +def test_get_profiles_returns_public_profiles(coordinator_client): + client, _ = coordinator_client + resp = client.get("/api/profiles") + assert resp.status_code == 200 + names = [p["name"] for p in resp.json()["profiles"]] + assert "single-gpu-8gb" in names + + +def test_post_lease_grants_lease(coordinator_client): + client, _ = coordinator_client + resp = client.post("/api/leases", json={ + "node_id": "heimdall", "gpu_id": 0, + "mb": 2048, "service": "peregrine", "priority": 1, + }) + assert resp.status_code == 200 + data = resp.json() + assert data["lease"]["mb_granted"] == 2048 + assert data["lease"]["holder_service"] == "peregrine" + assert "lease_id" in data["lease"] + + +def test_delete_lease_releases_it(coordinator_client): + client, _ = coordinator_client + resp = client.post("/api/leases", json={ + "node_id": "heimdall", "gpu_id": 0, + "mb": 2048, "service": "peregrine", "priority": 1, + }) + lease_id = resp.json()["lease"]["lease_id"] + del_resp = client.delete(f"/api/leases/{lease_id}") + assert del_resp.status_code == 200 + assert del_resp.json()["released"] is True + + +def test_delete_unknown_lease_returns_404(coordinator_client): + client, _ = coordinator_client + resp = client.delete("/api/leases/nonexistent-id") + assert resp.status_code == 404 + + +def test_get_leases_returns_active_leases(coordinator_client): + client, _ = coordinator_client + client.post("/api/leases", json={ + "node_id": "heimdall", "gpu_id": 0, + "mb": 1024, "service": "kiwi", "priority": 2, + }) + resp = client.get("/api/leases") + assert resp.status_code == 200 + assert len(resp.json()["leases"]) == 1 diff --git a/tests/test_resources/test_eviction_engine.py b/tests/test_resources/test_eviction_engine.py new file mode 100644 index 0000000..d7051e3 --- /dev/null +++ b/tests/test_resources/test_eviction_engine.py @@ -0,0 +1,67 @@ +import asyncio +import pytest +from unittest.mock import AsyncMock, patch +from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager + + +@pytest.fixture +def lease_manager(): + mgr = LeaseManager() + mgr.register_gpu("heimdall", 0, 8192) + return mgr + + +@pytest.fixture +def engine(lease_manager): + return EvictionEngine(lease_manager=lease_manager, eviction_timeout_s=0.1) + + +@pytest.mark.asyncio +async def test_request_lease_grants_when_vram_available(engine, lease_manager): + lease = await engine.request_lease( + node_id="heimdall", gpu_id=0, mb=4096, + service="peregrine", priority=1, + agent_url="http://localhost:7701", + ) + assert lease is not None + assert lease.mb_granted == 4096 + + +@pytest.mark.asyncio +async def test_request_lease_evicts_and_grants(engine, lease_manager): + # Pre-fill with a low-priority lease + big_lease = await lease_manager.try_grant( + "heimdall", 0, 7000, "comfyui", priority=4 + ) + assert big_lease is not None + + # Mock the agent eviction call + with patch( + "circuitforge_core.resources.coordinator.eviction_engine.EvictionEngine._call_agent_evict", + new_callable=AsyncMock, + ) as mock_evict: + mock_evict.return_value = True + # Simulate the comfyui lease being released (as if the agent evicted it) + asyncio.get_event_loop().call_later( + 0.05, lambda: asyncio.ensure_future(lease_manager.release(big_lease.lease_id)) + ) + lease = await engine.request_lease( + node_id="heimdall", gpu_id=0, mb=4096, + service="peregrine", priority=1, + agent_url="http://localhost:7701", + ) + assert lease is not None + assert lease.holder_service == "peregrine" + + +@pytest.mark.asyncio +async def test_request_lease_returns_none_when_no_eviction_candidates(engine): + await engine.lease_manager.try_grant("heimdall", 0, 6000, "vllm", priority=1) + # Requesting 4GB but no lower-priority leases exist + lease = await engine.request_lease( + node_id="heimdall", gpu_id=0, mb=4096, + service="kiwi", priority=2, + agent_url="http://localhost:7701", + ) + assert lease is None diff --git a/tests/test_resources/test_eviction_executor.py b/tests/test_resources/test_eviction_executor.py new file mode 100644 index 0000000..d718732 --- /dev/null +++ b/tests/test_resources/test_eviction_executor.py @@ -0,0 +1,43 @@ +import signal +from unittest.mock import patch, call +import pytest +from circuitforge_core.resources.agent.eviction_executor import EvictionExecutor, EvictionResult + + +def test_evict_by_pid_sends_sigterm_then_sigkill(): + executor = EvictionExecutor(grace_period_s=0.01) + # pid_exists always True → grace period expires → SIGKILL fires + with patch("os.kill") as mock_kill, \ + patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil: + mock_psutil.pid_exists.return_value = True + result = executor.evict_pid(pid=1234, grace_period_s=0.01) + + assert result.success is True + calls = mock_kill.call_args_list + assert call(1234, signal.SIGTERM) in calls + assert call(1234, signal.SIGKILL) in calls + + +def test_evict_pid_succeeds_on_sigterm_alone(): + executor = EvictionExecutor(grace_period_s=0.1) + with patch("os.kill"), \ + patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil: + mock_psutil.pid_exists.side_effect = [True, False] # gone after SIGTERM + result = executor.evict_pid(pid=5678, grace_period_s=0.01) + assert result.success is True + assert result.method == "sigterm" + + +def test_evict_pid_not_found_returns_failure(): + executor = EvictionExecutor() + with patch("circuitforge_core.resources.agent.eviction_executor.psutil") as mock_psutil: + mock_psutil.pid_exists.return_value = False + result = executor.evict_pid(pid=9999) + assert result.success is False + assert "not found" in result.message.lower() + + +def test_eviction_result_is_immutable(): + result = EvictionResult(success=True, method="sigterm", message="ok") + with pytest.raises((AttributeError, TypeError)): + result.success = False # type: ignore diff --git a/tests/test_resources/test_gpu_monitor.py b/tests/test_resources/test_gpu_monitor.py new file mode 100644 index 0000000..617f592 --- /dev/null +++ b/tests/test_resources/test_gpu_monitor.py @@ -0,0 +1,60 @@ +from unittest.mock import patch +from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor + + +SAMPLE_NVIDIA_SMI_OUTPUT = ( + "0, Quadro RTX 4000, 8192, 6843, 1349\n" + "1, Quadro RTX 4000, 8192, 721, 7471\n" +) + + +def test_parse_returns_list_of_gpu_info(): + monitor = GpuMonitor() + with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run: + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = SAMPLE_NVIDIA_SMI_OUTPUT + gpus = monitor.poll() + assert len(gpus) == 2 + assert gpus[0].gpu_id == 0 + assert gpus[0].name == "Quadro RTX 4000" + assert gpus[0].vram_total_mb == 8192 + assert gpus[0].vram_used_mb == 6843 + assert gpus[0].vram_free_mb == 1349 + + +def test_parse_second_gpu(): + monitor = GpuMonitor() + with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run: + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = SAMPLE_NVIDIA_SMI_OUTPUT + gpus = monitor.poll() + assert gpus[1].gpu_id == 1 + assert gpus[1].vram_used_mb == 721 + assert gpus[1].vram_free_mb == 7471 + + +def test_poll_returns_empty_list_when_nvidia_smi_unavailable(): + monitor = GpuMonitor() + with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run", side_effect=FileNotFoundError): + gpus = monitor.poll() + assert gpus == [] + + +def test_poll_returns_empty_list_on_nonzero_exit(): + monitor = GpuMonitor() + with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run: + mock_run.return_value.returncode = 1 + mock_run.return_value.stdout = "" + gpus = monitor.poll() + assert gpus == [] + + +def test_poll_skips_malformed_lines(): + monitor = GpuMonitor() + malformed = "0, RTX 4000, 8192, not_a_number, 1024\n1, RTX 4000, 8192, 512, 7680\n" + with patch("circuitforge_core.resources.agent.gpu_monitor.subprocess.run") as mock_run: + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = malformed + gpus = monitor.poll() + assert len(gpus) == 1 + assert gpus[0].gpu_id == 1 diff --git a/tests/test_resources/test_integration.py b/tests/test_resources/test_integration.py new file mode 100644 index 0000000..814bb32 --- /dev/null +++ b/tests/test_resources/test_integration.py @@ -0,0 +1,219 @@ +"""Integration test: full lease → eviction → re-grant cycle. + +Runs coordinator in-process (no subprocesses, no real nvidia-smi). +Uses TestClient for HTTP, mocks AgentSupervisor to return fixed node state. +""" +import pytest +from unittest.mock import MagicMock +from fastapi.testclient import TestClient + +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor +from circuitforge_core.resources.coordinator.app import create_coordinator_app +from circuitforge_core.resources.models import GpuInfo, NodeInfo + + +@pytest.fixture +def system(): + """Create an in-process coordinator system with 8GB GPU and mock supervisor.""" + lease_manager = LeaseManager() + lease_manager.register_gpu("local", 0, 8192) + + mock_supervisor = MagicMock(spec=AgentSupervisor) + mock_supervisor.all_nodes.return_value = [ + NodeInfo( + node_id="local", + agent_url="http://localhost:7701", + gpus=[GpuInfo( + gpu_id=0, + name="RTX 4000", + vram_total_mb=8192, + vram_used_mb=0, + vram_free_mb=8192, + )], + last_heartbeat=0.0, + ) + ] + mock_supervisor.get_node_info.return_value = NodeInfo( + node_id="local", + agent_url="http://localhost:7701", + gpus=[], + last_heartbeat=0.0, + ) + + profile_registry = ProfileRegistry() + app = create_coordinator_app( + lease_manager=lease_manager, + profile_registry=profile_registry, + agent_supervisor=mock_supervisor, + ) + client = TestClient(app) + return client, lease_manager + + +def test_full_lease_cycle(system): + """Test: grant, verify, release, verify gone.""" + client, _ = system + + # Grant a lease + resp = client.post("/api/leases", json={ + "node_id": "local", + "gpu_id": 0, + "mb": 4096, + "service": "peregrine", + "priority": 1, + }) + assert resp.status_code == 200 + lease_data = resp.json()["lease"] + lease_id = lease_data["lease_id"] + assert lease_data["mb_granted"] == 4096 + assert lease_data["holder_service"] == "peregrine" + + # Verify it appears in active leases + resp = client.get("/api/leases") + assert resp.status_code == 200 + leases = resp.json()["leases"] + assert any(l["lease_id"] == lease_id for l in leases) + + # Release it + resp = client.delete(f"/api/leases/{lease_id}") + assert resp.status_code == 200 + assert resp.json()["released"] is True + + # Verify it's gone + resp = client.get("/api/leases") + assert resp.status_code == 200 + leases = resp.json()["leases"] + assert not any(l["lease_id"] == lease_id for l in leases) + + +def test_vram_exhaustion_returns_503(system): + """Test: fill GPU, then request with no eviction candidates returns 503.""" + client, _ = system + + # Fill GPU 0 with high-priority lease + resp = client.post("/api/leases", json={ + "node_id": "local", + "gpu_id": 0, + "mb": 8000, + "service": "vllm", + "priority": 1, + }) + assert resp.status_code == 200 + + # Try to get more VRAM with same priority (no eviction candidates) + resp = client.post("/api/leases", json={ + "node_id": "local", + "gpu_id": 0, + "mb": 2000, + "service": "kiwi", + "priority": 1, + }) + assert resp.status_code == 503 + assert "Insufficient VRAM" in resp.json()["detail"] + + +def test_auto_detect_profile_for_8gb(): + """Test: ProfileRegistry auto-detects single-gpu-8gb for 8GB GPU.""" + registry = ProfileRegistry() + gpu = GpuInfo( + gpu_id=0, + name="RTX 4000", + vram_total_mb=8192, + vram_used_mb=0, + vram_free_mb=8192, + ) + profile = registry.auto_detect([gpu]) + assert profile.name == "single-gpu-8gb" + # Verify profile has services configured + assert hasattr(profile, "services") + + +def test_node_endpoint_shows_nodes(system): + """Test: GET /api/nodes returns the mocked node.""" + client, _ = system + resp = client.get("/api/nodes") + assert resp.status_code == 200 + nodes = resp.json()["nodes"] + assert len(nodes) == 1 + assert nodes[0]["node_id"] == "local" + assert nodes[0]["agent_url"] == "http://localhost:7701" + assert len(nodes[0]["gpus"]) == 1 + assert nodes[0]["gpus"][0]["name"] == "RTX 4000" + + +def test_profiles_endpoint_returns_public_profiles(system): + """Test: GET /api/profiles returns standard public profiles.""" + client, _ = system + resp = client.get("/api/profiles") + assert resp.status_code == 200 + profiles = resp.json()["profiles"] + names = [p["name"] for p in profiles] + # Verify common public profiles are present + assert "single-gpu-8gb" in names + assert "single-gpu-6gb" in names + assert "single-gpu-2gb" in names + + +def test_multiple_leases_tracked_independently(system): + """Test: multiple active leases are tracked correctly.""" + client, _ = system + + # Grant lease 1 + resp1 = client.post("/api/leases", json={ + "node_id": "local", + "gpu_id": 0, + "mb": 2048, + "service": "peregrine", + "priority": 2, + }) + assert resp1.status_code == 200 + lease1_id = resp1.json()["lease"]["lease_id"] + + # Grant lease 2 + resp2 = client.post("/api/leases", json={ + "node_id": "local", + "gpu_id": 0, + "mb": 2048, + "service": "kiwi", + "priority": 2, + }) + assert resp2.status_code == 200 + lease2_id = resp2.json()["lease"]["lease_id"] + + # Both should be in active leases + resp = client.get("/api/leases") + leases = resp.json()["leases"] + lease_ids = [l["lease_id"] for l in leases] + assert lease1_id in lease_ids + assert lease2_id in lease_ids + assert len(leases) == 2 + + # Release lease 1 + resp = client.delete(f"/api/leases/{lease1_id}") + assert resp.status_code == 200 + + # Only lease 2 should remain + resp = client.get("/api/leases") + leases = resp.json()["leases"] + lease_ids = [l["lease_id"] for l in leases] + assert lease1_id not in lease_ids + assert lease2_id in lease_ids + assert len(leases) == 1 + + +def test_delete_nonexistent_lease_returns_404(system): + """Test: deleting a nonexistent lease returns 404.""" + client, _ = system + resp = client.delete("/api/leases/nonexistent-lease-id") + assert resp.status_code == 404 + assert "not found" in resp.json()["detail"] + + +def test_health_endpoint_returns_ok(system): + """Test: GET /api/health returns status ok.""" + client, _ = system + resp = client.get("/api/health") + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" diff --git a/tests/test_resources/test_lease_manager.py b/tests/test_resources/test_lease_manager.py new file mode 100644 index 0000000..cede687 --- /dev/null +++ b/tests/test_resources/test_lease_manager.py @@ -0,0 +1,85 @@ +import pytest +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager + + +@pytest.fixture +def mgr(): + m = LeaseManager() + m.register_gpu(node_id="heimdall", gpu_id=0, total_mb=8192) + return m + + +@pytest.mark.asyncio +async def test_grant_succeeds_when_vram_available(mgr): + lease = await mgr.try_grant( + node_id="heimdall", gpu_id=0, mb=4096, + service="peregrine", priority=1 + ) + assert lease is not None + assert lease.mb_granted == 4096 + assert lease.node_id == "heimdall" + assert lease.gpu_id == 0 + + +@pytest.mark.asyncio +async def test_grant_fails_when_vram_insufficient(mgr): + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000, + service="vllm", priority=1) + lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000, + service="kiwi", priority=2) + assert lease is None + + +@pytest.mark.asyncio +async def test_release_frees_vram(mgr): + lease = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000, + service="vllm", priority=1) + assert lease is not None + released = await mgr.release(lease.lease_id) + assert released is True + lease2 = await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=7000, + service="comfyui", priority=4) + assert lease2 is not None + + +@pytest.mark.asyncio +async def test_release_unknown_lease_returns_false(mgr): + result = await mgr.release("nonexistent-id") + assert result is False + + +@pytest.mark.asyncio +async def test_get_eviction_candidates_returns_lower_priority_leases(mgr): + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=3000, + service="comfyui", priority=4) + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=2000, + service="ollama", priority=1) + candidates = mgr.get_eviction_candidates( + node_id="heimdall", gpu_id=0, + needed_mb=3000, requester_priority=2 + ) + assert len(candidates) == 1 + assert candidates[0].holder_service == "comfyui" + + +@pytest.mark.asyncio +async def test_list_leases_for_gpu(mgr): + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=1024, + service="peregrine", priority=1) + await mgr.try_grant(node_id="heimdall", gpu_id=0, mb=512, + service="kiwi", priority=2) + leases = mgr.list_leases(node_id="heimdall", gpu_id=0) + assert len(leases) == 2 + + +def test_register_gpu_sets_total(mgr): + assert mgr.gpu_total_mb("heimdall", 0) == 8192 + + +@pytest.mark.asyncio +async def test_used_mb_tracks_grants(): + mgr = LeaseManager() + mgr.register_gpu("heimdall", 0, 8192) + await mgr.try_grant("heimdall", 0, 3000, "a", 1) + await mgr.try_grant("heimdall", 0, 2000, "b", 2) + assert mgr.used_mb("heimdall", 0) == 5000 diff --git a/tests/test_resources/test_models.py b/tests/test_resources/test_models.py new file mode 100644 index 0000000..c8e5ac4 --- /dev/null +++ b/tests/test_resources/test_models.py @@ -0,0 +1,47 @@ +import time +import pytest +from circuitforge_core.resources.models import VRAMLease, GpuInfo, NodeInfo + + +def test_vram_lease_create_assigns_unique_ids(): + lease_a = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=4096, + service="peregrine", priority=1) + lease_b = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=4096, + service="peregrine", priority=1) + assert lease_a.lease_id != lease_b.lease_id + + +def test_vram_lease_create_with_ttl_sets_expiry(): + before = time.time() + lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=2048, + service="kiwi", priority=2, ttl_s=60.0) + after = time.time() + assert before + 60.0 <= lease.expires_at <= after + 60.0 + + +def test_vram_lease_create_no_ttl_has_zero_expiry(): + lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=1024, + service="snipe", priority=2) + assert lease.expires_at == 0.0 + + +def test_vram_lease_is_immutable(): + lease = VRAMLease.create(gpu_id=0, node_id="heimdall", mb=1024, + service="snipe", priority=2) + with pytest.raises((AttributeError, TypeError)): + lease.mb_granted = 999 # type: ignore + + +def test_gpu_info_fields(): + info = GpuInfo(gpu_id=0, name="RTX 4000", vram_total_mb=8192, + vram_used_mb=2048, vram_free_mb=6144) + assert info.vram_free_mb == 6144 + + +def test_node_info_fields(): + gpu = GpuInfo(gpu_id=0, name="RTX 4000", vram_total_mb=8192, + vram_used_mb=0, vram_free_mb=8192) + node = NodeInfo(node_id="heimdall", agent_url="http://localhost:7701", + gpus=[gpu], last_heartbeat=time.time()) + assert node.node_id == "heimdall" + assert len(node.gpus) == 1 diff --git a/tests/test_resources/test_profile_registry.py b/tests/test_resources/test_profile_registry.py new file mode 100644 index 0000000..e55bcfa --- /dev/null +++ b/tests/test_resources/test_profile_registry.py @@ -0,0 +1,101 @@ +# tests/test_resources/test_profile_registry.py +import pytest +from unittest.mock import MagicMock + +from circuitforge_core.resources.profiles.schema import ( + GpuProfile, ServiceProfile, load_profile +) +from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + + +def test_load_8gb_profile(tmp_path): + yaml_content = """ +schema_version: 1 +name: single-gpu-8gb +vram_total_mb: 8192 +eviction_timeout_s: 10.0 +services: + vllm: + max_mb: 5120 + priority: 1 + cf-vision: + max_mb: 2048 + priority: 2 + shared: true + max_concurrent: 3 +""" + profile_file = tmp_path / "test.yaml" + profile_file.write_text(yaml_content) + profile = load_profile(profile_file) + + assert profile.name == "single-gpu-8gb" + assert profile.schema_version == 1 + assert profile.vram_total_mb == 8192 + assert profile.eviction_timeout_s == 10.0 + assert "vllm" in profile.services + assert profile.services["vllm"].max_mb == 5120 + assert profile.services["vllm"].priority == 1 + assert profile.services["cf-vision"].shared is True + assert profile.services["cf-vision"].max_concurrent == 3 + + +def test_load_profile_rejects_wrong_schema_version(tmp_path): + yaml_content = "schema_version: 99\nname: future\n" + profile_file = tmp_path / "future.yaml" + profile_file.write_text(yaml_content) + with pytest.raises(ValueError, match="schema_version"): + load_profile(profile_file) + + +def test_service_profile_defaults(): + svc = ServiceProfile(max_mb=1024, priority=2) + assert svc.shared is False + assert svc.max_concurrent == 1 + assert svc.always_on is False + assert svc.backend is None + assert svc.consumers == [] + + +def test_profile_registry_loads_public_profiles(): + registry = ProfileRegistry() + profiles = registry.list_public() + names = [p.name for p in profiles] + assert "single-gpu-8gb" in names + assert "single-gpu-6gb" in names + assert "single-gpu-2gb" in names + + +def test_profile_registry_auto_detect_selects_8gb(): + registry = ProfileRegistry() + mock_gpus = [ + MagicMock(vram_total_mb=8192), + ] + profile = registry.auto_detect(mock_gpus) + assert profile.name == "single-gpu-8gb" + + +def test_profile_registry_auto_detect_selects_6gb(): + registry = ProfileRegistry() + mock_gpus = [MagicMock(vram_total_mb=6144)] + profile = registry.auto_detect(mock_gpus) + assert profile.name == "single-gpu-6gb" + + +def test_profile_registry_auto_detect_selects_2gb(): + registry = ProfileRegistry() + mock_gpus = [MagicMock(vram_total_mb=2048)] + profile = registry.auto_detect(mock_gpus) + assert profile.name == "single-gpu-2gb" + + +def test_profile_registry_load_from_path(tmp_path): + yaml_content = ( + "schema_version: 1\nname: custom\n" + "vram_total_mb: 12288\neviction_timeout_s: 5.0\n" + ) + p = tmp_path / "custom.yaml" + p.write_text(yaml_content) + registry = ProfileRegistry() + profile = registry.load(p) + assert profile.name == "custom" + assert profile.vram_total_mb == 12288