From c78341fc6f7f1b96fa18a8bfe0dded5ca4b0a671 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 15:33:08 -0700 Subject: [PATCH 1/4] feat(orch): replace Ouro/vllm-Docker with generic HF inference server; add ProcessSpec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add circuitforge_core/resources/inference/llm_server.py: generic OpenAI-compatible FastAPI server for any HuggingFace causal LM (Phi-4-mini-instruct, Qwen2.5-3B-Instruct) - Add service_manager.py + service_probe.py: ProcessSpec start/stop/is_running support (Popen-based; socket probe confirms readiness before marking running) - Update all 4 public GPU profiles to use ProcessSpec→llm_server instead of Docker vllm: 6gb (max_mb 5500), 8gb (max_mb 6500), 16gb/24gb (max_mb 9000) - Model candidates: Phi-4-mini-instruct first (7.2GB), Qwen2.5-3B-Instruct fallback (5.8GB) - Remove ouro_server.py (Ouro incompatible with transformers 5.x; vllm Docker also incompatible) - Add 17 tests for ServiceManager ProcessSpec (start/stop/is_running/list/get_url) --- .gitignore | 3 + circuitforge_core/db/base.py | 6 +- circuitforge_core/resources/agent/app.py | 43 +++- .../resources/agent/service_manager.py | 169 +++++++++++++++ .../resources/agent/service_probe.py | 123 +++++++++++ .../resources/inference/__init__.py | 0 .../resources/inference/llm_server.py | 135 ++++++++++++ circuitforge_core/resources/models.py | 10 + .../profiles/public/single-gpu-16gb.yaml | 9 +- .../profiles/public/single-gpu-24gb.yaml | 9 +- .../profiles/public/single-gpu-6gb.yaml | 13 +- .../profiles/public/single-gpu-8gb.yaml | 13 +- tests/test_resources/test_service_manager.py | 194 ++++++++++++++++++ 13 files changed, 707 insertions(+), 20 deletions(-) create mode 100644 circuitforge_core/resources/agent/service_manager.py create mode 100644 circuitforge_core/resources/agent/service_probe.py create mode 100644 circuitforge_core/resources/inference/__init__.py create mode 100644 circuitforge_core/resources/inference/llm_server.py create mode 100644 tests/test_resources/test_service_manager.py diff --git a/.gitignore b/.gitignore index 7d7f62b..cf5f809 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,9 @@ __pycache__/ dist/ .pytest_cache/ .superpowers/ +.coverage +build/ +" sqlite3.Connection: return conn # timeout=30: retry for up to 30s when another writer holds the lock (WAL mode # allows concurrent readers but only one writer at a time). - return sqlite3.connect(str(db_path), timeout=30) + # check_same_thread=False: each Store is owned by exactly one request; FastAPI + # uses asyncio.to_thread() to run sync DB calls in a worker thread, crossing + # the thread boundary that sqlite3 guards by default. Since no two threads share + # the same connection, disabling the check is safe. + return sqlite3.connect(str(db_path), timeout=30, check_same_thread=False) diff --git a/circuitforge_core/resources/agent/app.py b/circuitforge_core/resources/agent/app.py index 820c79a..0045041 100644 --- a/circuitforge_core/resources/agent/app.py +++ b/circuitforge_core/resources/agent/app.py @@ -3,11 +3,12 @@ from __future__ import annotations import logging from typing import Any -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException from pydantic import BaseModel from circuitforge_core.resources.agent.eviction_executor import EvictionExecutor from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor +from circuitforge_core.resources.agent.service_manager import ServiceManager logger = logging.getLogger(__name__) @@ -17,10 +18,16 @@ class EvictRequest(BaseModel): grace_period_s: float = 5.0 +class ServiceStartRequest(BaseModel): + gpu_id: int = 0 + params: dict[str, str] = {} + + def create_agent_app( node_id: str, monitor: GpuMonitor | None = None, executor: EvictionExecutor | None = None, + service_manager: ServiceManager | None = None, ) -> FastAPI: _monitor = monitor or GpuMonitor() _executor = executor or EvictionExecutor() @@ -57,4 +64,38 @@ def create_agent_app( "message": result.message, } + @app.get("/resident-info") + def resident_info() -> dict[str, Any]: + """Return which models are currently loaded in each running managed service.""" + if service_manager is None: + return {"residents": []} + from circuitforge_core.resources.agent.service_probe import probe_all + return {"residents": probe_all(service_manager)} + + if service_manager is not None: + @app.get("/services") + def list_services() -> dict: + return {"running": service_manager.list_running()} + + @app.get("/services/{service}") + def service_status(service: str) -> dict: + running = service_manager.is_running(service) + url = service_manager.get_url(service) if running else None + return {"service": service, "running": running, "url": url} + + @app.post("/services/{service}/start") + def start_service(service: str, req: ServiceStartRequest) -> dict: + try: + url = service_manager.start(service, req.gpu_id, req.params) + return {"service": service, "url": url, "running": True} + except (ValueError, NotImplementedError) as exc: + raise HTTPException(status_code=422, detail=str(exc)) + except Exception as exc: + raise HTTPException(status_code=500, detail=f"Failed to start {service}: {exc}") + + @app.post("/services/{service}/stop") + def stop_service(service: str) -> dict: + stopped = service_manager.stop(service) + return {"service": service, "stopped": stopped} + return app diff --git a/circuitforge_core/resources/agent/service_manager.py b/circuitforge_core/resources/agent/service_manager.py new file mode 100644 index 0000000..336330b --- /dev/null +++ b/circuitforge_core/resources/agent/service_manager.py @@ -0,0 +1,169 @@ +""" +ServiceManager — start/stop Docker containers and processes for cf-orch managed services. + +Container naming convention: cf-orch-{service}-{node_id} +""" +from __future__ import annotations + +import os +import re +import subprocess +from collections import defaultdict +from typing import Any + +from circuitforge_core.resources.profiles.schema import DockerSpec, GpuProfile, ProcessSpec + + +def _expand_volume(v: str) -> str: + """Expand bash-style volume strings including ${VAR:-default} and $VAR.""" + def _sub(m: re.Match) -> str: # type: ignore[type-arg] + var, default = m.group(1), m.group(2) or "" + return os.environ.get(var) or default + v = re.sub(r"\$\{(\w+)(?::-(.*?))?\}", _sub, v) + v = re.sub(r"\$(\w+)", lambda m: os.environ.get(m.group(1), m.group(0)), v) + return v + + +class ServiceManager: + def __init__( + self, + node_id: str, + profile: GpuProfile, + advertise_host: str = "127.0.0.1", + ) -> None: + self.node_id = node_id + self.profile = profile + self.advertise_host = advertise_host + self._procs: dict[str, Any] = {} + + def container_name(self, service: str) -> str: + return f"cf-orch-{service}-{self.node_id}" + + def _get_spec(self, service: str) -> DockerSpec | ProcessSpec | None: + svc = self.profile.services.get(service) + if svc is None: + return None + return svc.managed + + def is_running(self, service: str) -> bool: + spec = self._get_spec(service) + if spec is None: + return False + if isinstance(spec, DockerSpec): + try: + result = subprocess.run( + [ + "docker", + "inspect", + "--format", + "{{.State.Running}}", + self.container_name(service), + ], + capture_output=True, + text=True, + check=True, + ) + return result.stdout.strip() == "true" + except subprocess.CalledProcessError: + return False + if isinstance(spec, ProcessSpec): + proc = self._procs.get(service) + if proc is None or proc.poll() is not None: + return False + import socket + try: + with socket.create_connection(("127.0.0.1", spec.host_port), timeout=1): + return True + except OSError: + return False + return False + + def start(self, service: str, gpu_id: int, params: dict[str, str]) -> str: + spec = self._get_spec(service) + if spec is None: + raise ValueError(f"Service {service!r} not in profile or has no managed spec") + + if self.is_running(service): + return f"http://{self.advertise_host}:{spec.host_port}" + + if isinstance(spec, DockerSpec): + expanded_volumes = [_expand_volume(v) for v in spec.volumes] + + filler: dict[str, str] = defaultdict(str, params) + expanded_command = spec.command_template.format_map(filler).split() + + cmd = [ + "docker", "run", "-d", "--rm", + "--name", self.container_name(service), + "--runtime", spec.runtime, + "--gpus", f"device={gpu_id}", + "--ipc", spec.ipc, + "-p", f"{spec.host_port}:{spec.port}", + ] + for vol in expanded_volumes: + cmd += ["-v", vol] + for key, val in spec.env.items(): + cmd += ["-e", f"{key}={val}"] + cmd.append(spec.image) + cmd.extend(expanded_command) + + subprocess.run(cmd, check=True, capture_output=True, text=True) + return f"http://{self.advertise_host}:{spec.host_port}" + + if isinstance(spec, ProcessSpec): + import shlex + import subprocess as _sp + + filler = defaultdict(str, params) + filler.setdefault("port", str(spec.port)) + filler.setdefault("gpu_id", str(gpu_id)) + args_expanded = spec.args_template.format_map(filler).split() + + cmd = [spec.exec_path] + args_expanded + env = {**__import__("os").environ} + proc = _sp.Popen( + cmd, + cwd=spec.cwd or None, + env=env, + stdout=_sp.DEVNULL, + stderr=_sp.DEVNULL, + ) + self._procs[service] = proc + return f"http://{self.advertise_host}:{spec.host_port}" + + raise NotImplementedError(f"Unknown spec type: {type(spec)}") + + def stop(self, service: str) -> bool: + spec = self._get_spec(service) + if spec is None: + return False + if isinstance(spec, DockerSpec): + try: + subprocess.run( + ["docker", "stop", self.container_name(service)], + check=True, + capture_output=True, + text=True, + ) + return True + except subprocess.CalledProcessError: + return False + if isinstance(spec, ProcessSpec): + proc = self._procs.pop(service, None) + if proc is not None: + proc.terminate() + try: + proc.wait(timeout=10) + except Exception: + proc.kill() + return True + return False + + def list_running(self) -> list[str]: + return [svc for svc in self.profile.services if self.is_running(svc)] + + def get_url(self, service: str) -> str | None: + spec = self._get_spec(service) + if spec is None or not self.is_running(service): + return None + return f"http://{self.advertise_host}:{spec.host_port}" diff --git a/circuitforge_core/resources/agent/service_probe.py b/circuitforge_core/resources/agent/service_probe.py new file mode 100644 index 0000000..e2b6efa --- /dev/null +++ b/circuitforge_core/resources/agent/service_probe.py @@ -0,0 +1,123 @@ +""" +Probe running services to detect which models are currently loaded in VRAM. + +Two probe strategies run together: + +1. Well-known ports — always checked, regardless of who started the service. + Catches ollama, vLLM, etc. running outside cf-orch management. + +2. Managed services — services cf-orch started via ServiceManager. + Checked on their configured host_port, deduplicates with well-known results. + +Each service exposes a different introspection API: + - vllm: GET /v1/models → {"data": [{"id": ""}]} + - ollama: GET /api/ps → {"models": [{"name": "", "size_vram": }]} + +ollama can have multiple models loaded simultaneously; each is reported as a +separate entry so the dashboard shows per-model residency. + +The probe is best-effort: a timeout or connection refusal means model_name=None +but the service is still reported as resident. +""" +from __future__ import annotations + +import json +import logging +import urllib.request +from typing import Any + +from circuitforge_core.resources.profiles.schema import DockerSpec + +logger = logging.getLogger(__name__) + +_PROBE_TIMEOUT_S = 2.0 + +# Well-known service ports probed on every heartbeat. +# key → (service_name, prober_key) +_WELL_KNOWN_PORTS: dict[int, str] = { + 11434: "ollama", + 8000: "vllm", + 8080: "vllm", # common alt vLLM port +} + + +def _fetch_json(url: str) -> dict[str, Any] | None: + """GET a URL and parse JSON; returns None on any error.""" + try: + with urllib.request.urlopen(url, timeout=_PROBE_TIMEOUT_S) as resp: + return json.loads(resp.read()) + except Exception as exc: + logger.debug("Probe %s: %s", url, exc) + return None + + +def _probe_vllm(port: int) -> list[str]: + data = _fetch_json(f"http://127.0.0.1:{port}/v1/models") + if data and data.get("data"): + return [m["id"] for m in data["data"] if m.get("id")] + return [] + + +def _probe_ollama(port: int) -> list[str]: + # /api/ps lists models currently *loaded in memory*, not just downloaded. + data = _fetch_json(f"http://127.0.0.1:{port}/api/ps") + if data and data.get("models"): + return [m["name"] for m in data["models"] if m.get("name")] + return [] + + +_PROBERS: dict[str, Any] = { + "vllm": _probe_vllm, + "ollama": _probe_ollama, +} + + +def probe_all(service_manager: Any) -> list[dict[str, Any]]: + """ + Probe all services — both well-known ports and cf-orch managed services. + + Returns a list of dicts: [{"service": str, "model_name": str | None}]. + Multiple loaded models in one service (e.g. two ollama models) each get + their own entry, disambiguated as "ollama/0", "ollama/1", etc. + """ + results: list[dict[str, Any]] = [] + seen_ports: set[int] = set() + + # ── 1. Well-known ports ────────────────────────────────────────── + for port, service in _WELL_KNOWN_PORTS.items(): + prober = _PROBERS.get(service) + if prober is None: + continue + models = prober(port) + if not models: + continue # nothing on this port right now + seen_ports.add(port) + if len(models) == 1: + results.append({"service": service, "model_name": models[0]}) + else: + for i, model in enumerate(models): + results.append({"service": f"{service}/{i}", "model_name": model}) + + # ── 2. Managed services (cf-orch started) ─────────────────────── + if service_manager is not None: + for service in service_manager.list_running(): + spec = service_manager._get_spec(service) + if not isinstance(spec, DockerSpec): + continue + if spec.host_port in seen_ports: + continue # already captured by well-known probe + prober = _PROBERS.get(service) + if prober is None: + results.append({"service": service, "model_name": None}) + continue + models = prober(spec.host_port) + seen_ports.add(spec.host_port) + if not models: + results.append({"service": service, "model_name": None}) + elif len(models) == 1: + results.append({"service": service, "model_name": models[0]}) + else: + for i, model in enumerate(models): + results.append({"service": f"{service}/{i}", "model_name": model}) + + return results diff --git a/circuitforge_core/resources/inference/__init__.py b/circuitforge_core/resources/inference/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/circuitforge_core/resources/inference/llm_server.py b/circuitforge_core/resources/inference/llm_server.py new file mode 100644 index 0000000..932e860 --- /dev/null +++ b/circuitforge_core/resources/inference/llm_server.py @@ -0,0 +1,135 @@ +"""Generic OpenAI-compatible inference server for HuggingFace causal LMs.""" +from __future__ import annotations + +import argparse +import time +import uuid +from contextlib import asynccontextmanager +from typing import Any + +import torch +import uvicorn +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from transformers import AutoModelForCausalLM, AutoTokenizer + +_model: Any = None +_tokenizer: Any = None +_model_id: str = "" +_device: str = "cpu" + + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + + +app = FastAPI(lifespan=lifespan) + + +class Message(BaseModel): + role: str + content: str + + +class ChatRequest(BaseModel): + model: str | None = None + messages: list[Message] + max_tokens: int | None = 512 + temperature: float | None = 0.7 + stream: bool | None = False + + +@app.get("/health") +def health() -> dict[str, str]: + return {"status": "ok", "model": _model_id} + + +@app.get("/v1/models") +def list_models() -> dict[str, Any]: + return { + "object": "list", + "data": [{"id": _model_id, "object": "model", "owned_by": "cf-orch"}], + } + + +@app.post("/v1/chat/completions") +def chat_completions(req: ChatRequest) -> dict[str, Any]: + if _model is None: + raise HTTPException(503, detail="Model not loaded") + if req.stream: + raise HTTPException(501, detail="Streaming not supported") + + conversation = [{"role": m.role, "content": m.content} for m in req.messages] + try: + input_ids = _tokenizer.apply_chat_template( + conversation, + return_tensors="pt", + add_generation_prompt=True, + ).to(_device) + except Exception as exc: + raise HTTPException(500, detail=f"Tokenisation failed: {exc}") + + max_new = req.max_tokens or 512 + temp = req.temperature if req.temperature is not None else 0.7 + gen_kwargs: dict[str, Any] = { + "max_new_tokens": max_new, + "do_sample": temp > 0, + "pad_token_id": _tokenizer.eos_token_id, + } + if temp > 0: + gen_kwargs["temperature"] = temp + + with torch.inference_mode(): + output_ids = _model.generate(input_ids, **gen_kwargs) + + new_tokens = output_ids[0][input_ids.shape[-1]:] + reply = _tokenizer.decode(new_tokens, skip_special_tokens=True) + + return { + "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", + "object": "chat.completion", + "created": int(time.time()), + "model": _model_id, + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": reply}, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": input_ids.shape[-1], + "completion_tokens": len(new_tokens), + "total_tokens": input_ids.shape[-1] + len(new_tokens), + }, + } + + +def _load_model(model_path: str, gpu_id: int) -> None: + global _model, _tokenizer, _model_id, _device + _device = f"cuda:{gpu_id}" if torch.cuda.is_available() else "cpu" + _model_id = model_path + _tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) + _model = AutoModelForCausalLM.from_pretrained( + model_path, + torch_dtype=torch.float16 if "cuda" in _device else torch.float32, + device_map={"": _device}, + trust_remote_code=True, + ) + _model.eval() + + +def main() -> None: + parser = argparse.ArgumentParser(description="cf-orch generic LLM inference server") + parser.add_argument("--model", required=True) + parser.add_argument("--port", type=int, default=8000) + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--gpu-id", type=int, default=0) + args = parser.parse_args() + _load_model(args.model, args.gpu_id) + uvicorn.run(app, host=args.host, port=args.port, log_level="info") + + +if __name__ == "__main__": + main() diff --git a/circuitforge_core/resources/models.py b/circuitforge_core/resources/models.py index 53b4886..fb6a6ba 100644 --- a/circuitforge_core/resources/models.py +++ b/circuitforge_core/resources/models.py @@ -3,6 +3,7 @@ from __future__ import annotations import time import uuid from dataclasses import dataclass, field +from typing import Optional @dataclass(frozen=True) @@ -48,6 +49,15 @@ class GpuInfo: vram_free_mb: int +@dataclass(frozen=True) +class ResidentAllocation: + """A model that is loaded and warm in VRAM but not actively serving a request.""" + service: str + node_id: str + model_name: Optional[str] # None if service is running but model probe failed + first_seen: float = field(default_factory=time.time) + + @dataclass class NodeInfo: node_id: str diff --git a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml index 84daf6a..23b6e54 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml @@ -4,9 +4,16 @@ vram_total_mb: 16384 eviction_timeout_s: 10.0 services: vllm: - max_mb: 12288 + max_mb: 9000 priority: 1 idle_stop_after_s: 600 + managed: + type: process + exec_path: "/devl/miniconda3/envs/cf/bin/python" + args_template: "-m circuitforge_core.resources.inference.llm_server --model /Library/Assets/LLM/vllm/models/{model} --port {port} --gpu-id {gpu_id}" + port: 8000 + host_port: 8000 + cwd: "/Library/Development/CircuitForge/circuitforge-core" ollama: max_mb: 12288 priority: 1 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml index e0ca256..243adf9 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml @@ -4,9 +4,16 @@ vram_total_mb: 24576 eviction_timeout_s: 10.0 services: vllm: - max_mb: 20480 + max_mb: 9000 priority: 1 idle_stop_after_s: 600 + managed: + type: process + exec_path: "/devl/miniconda3/envs/cf/bin/python" + args_template: "-m circuitforge_core.resources.inference.llm_server --model /Library/Assets/LLM/vllm/models/{model} --port {port} --gpu-id {gpu_id}" + port: 8000 + host_port: 8000 + cwd: "/Library/Development/CircuitForge/circuitforge-core" ollama: max_mb: 18432 priority: 1 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml index 1888aa4..935a536 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml @@ -4,19 +4,16 @@ vram_total_mb: 6144 eviction_timeout_s: 10.0 services: vllm: - max_mb: 4096 + max_mb: 5500 priority: 1 idle_stop_after_s: 600 managed: - type: docker - image: "vllm/vllm-openai:v0.9.2" + type: process + exec_path: "/devl/miniconda3/envs/cf/bin/python" + args_template: "-m circuitforge_core.resources.inference.llm_server --model /Library/Assets/LLM/vllm/models/{model} --port {port} --gpu-id {gpu_id}" port: 8000 host_port: 8000 - command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8" - volumes: - - "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models" - runtime: nvidia - ipc: host + cwd: "/Library/Development/CircuitForge/circuitforge-core" ollama: max_mb: 3584 priority: 1 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml index 614416d..7d62939 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml @@ -4,19 +4,16 @@ vram_total_mb: 8192 eviction_timeout_s: 10.0 services: vllm: - max_mb: 5120 + max_mb: 6500 priority: 1 idle_stop_after_s: 600 managed: - type: docker - image: "vllm/vllm-openai:v0.9.2" + type: process + exec_path: "/devl/miniconda3/envs/cf/bin/python" + args_template: "-m circuitforge_core.resources.inference.llm_server --model /Library/Assets/LLM/vllm/models/{model} --port {port} --gpu-id {gpu_id}" port: 8000 host_port: 8000 - command_template: "--model /models/{model} --trust-remote-code --max-model-len {max_model_len} --gpu-memory-utilization {gpu_mem_util} --enforce-eager --max-num-seqs 8" - volumes: - - "${VLLM_MODELS_DIR:-/Library/Assets/LLM/vllm/models}:/models" - runtime: nvidia - ipc: host + cwd: "/Library/Development/CircuitForge/circuitforge-core" ollama: max_mb: 4096 priority: 1 diff --git a/tests/test_resources/test_service_manager.py b/tests/test_resources/test_service_manager.py new file mode 100644 index 0000000..a5c26eb --- /dev/null +++ b/tests/test_resources/test_service_manager.py @@ -0,0 +1,194 @@ +"""Tests for ServiceManager ProcessSpec support.""" +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from circuitforge_core.resources.agent.service_manager import ServiceManager +from circuitforge_core.resources.profiles.schema import ( + GpuProfile, + ProcessSpec, + ServiceProfile, +) + + +def _make_profile(args_template: str = "--port {port} --gpu-id {gpu_id}") -> GpuProfile: + return GpuProfile( + schema_version=1, + name="test", + vram_total_mb=8192, + services={ + "vllm": ServiceProfile( + max_mb=5120, + priority=1, + managed=ProcessSpec( + exec_path="/usr/bin/python", + args_template=args_template, + port=8000, + host_port=8000, + cwd="/tmp", + ), + ), + "no_managed": ServiceProfile(max_mb=1024, priority=2), + }, + ) + + +@pytest.fixture +def manager(): + return ServiceManager(node_id="test-node", profile=_make_profile(), advertise_host="127.0.0.1") + + +# --------------------------------------------------------------------------- +# is_running +# --------------------------------------------------------------------------- + + +def test_is_running_returns_false_when_no_proc(manager): + assert manager.is_running("vllm") is False + + +def test_is_running_returns_false_when_proc_exited(manager): + mock_proc = MagicMock() + mock_proc.poll.return_value = 1 # exited + manager._procs["vllm"] = mock_proc + assert manager.is_running("vllm") is False + + +def test_is_running_returns_false_when_port_not_listening(manager): + mock_proc = MagicMock() + mock_proc.poll.return_value = None # still running + manager._procs["vllm"] = mock_proc + + with patch("socket.create_connection", side_effect=OSError("refused")): + assert manager.is_running("vllm") is False + + +def test_is_running_returns_true_when_proc_alive_and_port_open(manager): + mock_proc = MagicMock() + mock_proc.poll.return_value = None # still running + manager._procs["vllm"] = mock_proc + + mock_socket = MagicMock() + mock_socket.__enter__ = MagicMock(return_value=mock_socket) + mock_socket.__exit__ = MagicMock(return_value=False) + with patch("socket.create_connection", return_value=mock_socket): + assert manager.is_running("vllm") is True + + +def test_is_running_unknown_service_returns_false(manager): + assert manager.is_running("nonexistent") is False + + +def test_is_running_no_managed_spec_returns_false(manager): + assert manager.is_running("no_managed") is False + + +# --------------------------------------------------------------------------- +# start +# --------------------------------------------------------------------------- + + +def test_start_launches_process_and_returns_url(manager): + with patch("subprocess.Popen") as mock_popen, \ + patch.object(manager, "is_running", return_value=False): + mock_popen.return_value = MagicMock() + url = manager.start("vllm", gpu_id=0, params={"model": "mymodel"}) + + assert url == "http://127.0.0.1:8000" + mock_popen.assert_called_once() + call_args = mock_popen.call_args + cmd = call_args[0][0] + assert cmd[0] == "/usr/bin/python" + assert "--port" in cmd + assert "8000" in cmd + assert "--gpu-id" in cmd + assert "0" in cmd + + +def test_start_returns_url_immediately_when_already_running(manager): + with patch.object(manager, "is_running", return_value=True): + with patch("subprocess.Popen") as mock_popen: + url = manager.start("vllm", gpu_id=0, params={}) + + assert url == "http://127.0.0.1:8000" + mock_popen.assert_not_called() + + +def test_start_raises_for_unknown_service(manager): + with pytest.raises(ValueError, match="not in profile"): + manager.start("nonexistent", gpu_id=0, params={}) + + +def test_start_stores_proc_in_procs(manager): + mock_proc = MagicMock() + with patch("subprocess.Popen", return_value=mock_proc), \ + patch.object(manager, "is_running", return_value=False): + manager.start("vllm", gpu_id=0, params={}) + + assert manager._procs["vllm"] is mock_proc + + +# --------------------------------------------------------------------------- +# stop +# --------------------------------------------------------------------------- + + +def test_stop_terminates_running_process(manager): + mock_proc = MagicMock() + manager._procs["vllm"] = mock_proc + + result = manager.stop("vllm") + + assert result is True + mock_proc.terminate.assert_called_once() + mock_proc.wait.assert_called_once() + assert "vllm" not in manager._procs + + +def test_stop_kills_process_that_wont_terminate(manager): + mock_proc = MagicMock() + mock_proc.wait.side_effect = Exception("timeout") + manager._procs["vllm"] = mock_proc + + result = manager.stop("vllm") + + assert result is True + mock_proc.kill.assert_called_once() + + +def test_stop_returns_true_when_no_proc_tracked(manager): + # No proc in _procs — still returns True (idempotent stop) + result = manager.stop("vllm") + assert result is True + + +def test_stop_returns_false_for_unknown_service(manager): + result = manager.stop("nonexistent") + assert result is False + + +# --------------------------------------------------------------------------- +# list_running / get_url +# --------------------------------------------------------------------------- + + +def test_list_running_returns_running_services(manager): + def _is_running(svc: str) -> bool: + return svc == "vllm" + + with patch.object(manager, "is_running", side_effect=_is_running): + running = manager.list_running() + + assert running == ["vllm"] + + +def test_get_url_returns_none_when_not_running(manager): + with patch.object(manager, "is_running", return_value=False): + assert manager.get_url("vllm") is None + + +def test_get_url_returns_url_when_running(manager): + with patch.object(manager, "is_running", return_value=True): + assert manager.get_url("vllm") == "http://127.0.0.1:8000" -- 2.45.2 From 2d095f0090f52b424bee4bb5180a31b73245bf33 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 16:36:07 -0700 Subject: [PATCH 2/4] fix(llm-server): handle transformers 5.x BatchEncoding; use dtype kwarg - apply_chat_template() returns BatchEncoding in transformers 5.x (not bare tensor); extract .input_ids explicitly with fallback for 4.x compat - Switch from deprecated torch_dtype= to dtype= in from_pretrained() --- circuitforge_core/resources/inference/llm_server.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/circuitforge_core/resources/inference/llm_server.py b/circuitforge_core/resources/inference/llm_server.py index 932e860..a049e0f 100644 --- a/circuitforge_core/resources/inference/llm_server.py +++ b/circuitforge_core/resources/inference/llm_server.py @@ -62,11 +62,13 @@ def chat_completions(req: ChatRequest) -> dict[str, Any]: conversation = [{"role": m.role, "content": m.content} for m in req.messages] try: - input_ids = _tokenizer.apply_chat_template( + encoded = _tokenizer.apply_chat_template( conversation, return_tensors="pt", add_generation_prompt=True, - ).to(_device) + ) + # transformers 5.x returns BatchEncoding; 4.x returned a bare tensor + input_ids = (encoded.input_ids if hasattr(encoded, "input_ids") else encoded).to(_device) except Exception as exc: raise HTTPException(500, detail=f"Tokenisation failed: {exc}") @@ -113,7 +115,7 @@ def _load_model(model_path: str, gpu_id: int) -> None: _tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) _model = AutoModelForCausalLM.from_pretrained( model_path, - torch_dtype=torch.float16 if "cuda" in _device else torch.float32, + dtype=torch.float16 if "cuda" in _device else torch.float32, device_map={"": _device}, trust_remote_code=True, ) -- 2.45.2 From bd132851ecacdb621b101751a87874279820cd50 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 16:44:36 -0700 Subject: [PATCH 3/4] fix(orch): tighten VRAM pre-flight to require full max_mb free (not half) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit max_mb // 2 was too loose — Qwen2.5-3B needs ~5.9 GB on an 8 GB card but the threshold only required 3.25 GB free, allowing Ollama to hold 4.5 GB while a load attempt was still dispatched (causing OOM crash). - node_selector: can_fit = free_mb >= service_max_mb (was // 2) - coordinator /start: same threshold fix + updated error message - tests: two new node_selector tests pin the full-ceiling semantics; updated stale docstring in coordinator app test --- .../resources/coordinator/app.py | 8 +++--- .../resources/coordinator/node_selector.py | 2 +- tests/test_resources/test_coordinator_app.py | 4 +-- tests/test_resources/test_node_selector.py | 26 +++++++++++++++++++ 4 files changed, 33 insertions(+), 7 deletions(-) diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index c51f32d..b893652 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -227,12 +227,12 @@ def create_coordinator_app( service_max_mb = svc.max_mb break - # Filter candidates by VRAM headroom — skip models where free VRAM - # is less than half of the service's max_mb ceiling. - if service_max_mb > 0 and free_mb < service_max_mb // 2: + # Filter candidates by VRAM headroom — require free VRAM >= service ceiling + # so the model can actually load without competing for VRAM with other processes. + if service_max_mb > 0 and free_mb < service_max_mb: raise HTTPException( 503, - detail=f"Insufficient VRAM on gpu {req.gpu_id}: {free_mb}MB free, need at least {service_max_mb // 2}MB", + detail=f"Insufficient VRAM on gpu {req.gpu_id}: {free_mb}MB free, need {service_max_mb}MB", ) last_error: str = "" diff --git a/circuitforge_core/resources/coordinator/node_selector.py b/circuitforge_core/resources/coordinator/node_selector.py index 9cdb9f4..52ab224 100644 --- a/circuitforge_core/resources/coordinator/node_selector.py +++ b/circuitforge_core/resources/coordinator/node_selector.py @@ -42,7 +42,7 @@ def select_node( for gpu in record.gpus: warm = f"{node_id}:{service}" in resident_keys effective = gpu.vram_free_mb + (_WARM_BONUS_MB if warm else 0) - can_fit = gpu.vram_free_mb >= service_max_mb // 2 + can_fit = gpu.vram_free_mb >= service_max_mb candidates.append(_Scored( node_id=node_id, gpu_id=gpu.gpu_id, diff --git a/tests/test_resources/test_coordinator_app.py b/tests/test_resources/test_coordinator_app.py index 49eacbf..598ea50 100644 --- a/tests/test_resources/test_coordinator_app.py +++ b/tests/test_resources/test_coordinator_app.py @@ -149,8 +149,8 @@ def test_single_gpu_8gb_profile_has_idle_stop_after_s(): def test_ensure_service_returns_503_when_vram_too_low(): - """VRAM pre-flight guard fires before any HTTP request when free VRAM < max_mb // 2.""" - # vllm max_mb = 5120 → threshold = 2560 MB; 100 MB free triggers 503. + """VRAM pre-flight guard fires before any HTTP request when free VRAM < service max_mb.""" + # Threshold = full max_mb (not half); 100 MB free on any profile triggers 503. lease_manager = LeaseManager() lease_manager.register_gpu("low-vram-node", 0, 512) profile_registry = ProfileRegistry() diff --git a/tests/test_resources/test_node_selector.py b/tests/test_resources/test_node_selector.py index 9e18a3a..50b500e 100644 --- a/tests/test_resources/test_node_selector.py +++ b/tests/test_resources/test_node_selector.py @@ -54,3 +54,29 @@ def test_returns_none_when_no_agents(): registry = ProfileRegistry() result = select_node({}, "vllm", registry, resident_keys=set()) assert result is None + + +def test_prefers_node_that_fully_fits_service_over_one_that_does_not(): + """can_fit requires free_mb >= service max_mb (full ceiling, not half). + 9500 MB guarantees above all profile ceilings (max is 9000); 1000 MB is below all. + """ + agents = { + "a": _make_agent("a", free_mb=1000), + "b": _make_agent("b", free_mb=9500), + } + registry = ProfileRegistry() + result = select_node(agents, "vllm", registry, resident_keys=set()) + # "b" is the only node in the preferred (can_fit) pool + assert result == ("b", 0) + + +def test_falls_back_to_best_effort_when_no_node_fully_fits(): + """When nothing can_fit, select_node returns the best-VRAM node as fallback.""" + agents = { + "a": _make_agent("a", free_mb=1000), + "b": _make_agent("b", free_mb=2000), + } + registry = ProfileRegistry() + # Neither has enough free VRAM; fallback picks highest effective_free_mb + result = select_node(agents, "vllm", registry, resident_keys=set()) + assert result == ("b", 0) -- 2.45.2 From a7290c124004fe3bef1c10c2e988741b003b2bfd Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 17:18:16 -0700 Subject: [PATCH 4/4] =?UTF-8?q?feat(orch):=20background=20health=20probe?= =?UTF-8?q?=20loop=20=E2=80=94=20starting=20=E2=86=92=20running=20transiti?= =?UTF-8?q?on?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Coordinator now polls all 'starting' instances every 5 s via GET /health. On 200: state → running. After 300 s without a healthy response: state → stopped. Closes #10. --- .../resources/coordinator/app.py | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index b893652..f55cbb5 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -1,9 +1,14 @@ from __future__ import annotations +import logging +import time +import urllib.request from contextlib import asynccontextmanager from pathlib import Path from typing import Any +logger = logging.getLogger(__name__) + from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel @@ -17,6 +22,54 @@ from circuitforge_core.resources.coordinator.service_registry import ServiceRegi _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() +_PROBE_INTERVAL_S = 5.0 # how often to poll starting instances +_PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds + + +async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None: + """ + Background loop: transition 'starting' instances to 'running' once their + /health endpoint responds, or to 'stopped' after PROBE_TIMEOUT_S. + """ + import asyncio + + start_times: dict[str, float] = {} # instance key → time first seen as starting + + while True: + await asyncio.sleep(_PROBE_INTERVAL_S) + now = time.time() + for inst in service_registry.all_instances(): + if inst.state != "starting": + start_times.pop(f"{inst.service}:{inst.node_id}:{inst.gpu_id}", None) + continue + key = f"{inst.service}:{inst.node_id}:{inst.gpu_id}" + start_times.setdefault(key, now) + + healthy = False + if inst.url: + try: + with urllib.request.urlopen( + inst.url.rstrip("/") + "/health", timeout=2.0 + ) as resp: + healthy = resp.status == 200 + except Exception: + pass + + if healthy: + service_registry.upsert_instance( + service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id, + state="running", model=inst.model, url=inst.url, + ) + start_times.pop(key, None) + logger.info("Instance %s/%s gpu=%s transitioned to running", inst.service, inst.node_id, inst.gpu_id) + elif now - start_times[key] > _PROBE_TIMEOUT_S: + service_registry.upsert_instance( + service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id, + state="stopped", model=inst.model, url=inst.url, + ) + start_times.pop(key, None) + logger.warning("Instance %s/%s gpu=%s timed out in starting state — marked stopped", inst.service, inst.node_id, inst.gpu_id) + class LeaseRequest(BaseModel): node_id: str @@ -61,10 +114,12 @@ def create_coordinator_app( @asynccontextmanager async def _lifespan(app: FastAPI): # type: ignore[type-arg] import asyncio - task = asyncio.create_task(agent_supervisor.run_heartbeat_loop()) + heartbeat_task = asyncio.create_task(agent_supervisor.run_heartbeat_loop()) + probe_task = asyncio.create_task(_run_instance_probe_loop(service_registry)) yield agent_supervisor.stop() - task.cancel() + heartbeat_task.cancel() + probe_task.cancel() app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan) -- 2.45.2