From 7bb6b76bd5afc0080482b18674c44972b577dc61 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 22:09:42 -0700 Subject: [PATCH] feat: ollama adopt-if-running + health_path in ProcessSpec (#16) - ProcessSpec: adopt (bool) and health_path (str, default /health) fields - ServiceManager: adopt=True probes health_path before spawning; is_running() uses health probe for adopt services rather than proc table + socket check - _probe_health() helper: urllib GET on localhost:port+path, returns bool - Agent /services/{service}/start: returns adopted=True when service was already running; coordinator sets state=running immediately (no probe wait) - ServiceInstance: health_path field (default /health) - service_registry.upsert_instance(): health_path kwarg - Probe loop uses inst.health_path instead of hardcoded /health - coordinator allocate_service: looks up health_path from profile spec via _get_health_path() and stores on ServiceInstance - All GPU profiles (2/4/6/8/16/24 GB + cpu-16/32): ollama managed block with adopt=true, health_path=/api/tags, port 11434 - 11 new tests --- circuitforge_core/resources/agent/app.py | 6 +- .../resources/agent/service_manager.py | 19 +- .../resources/coordinator/app.py | 20 +- .../resources/coordinator/service_registry.py | 3 + .../resources/profiles/public/cpu-16gb.yaml | 8 + .../resources/profiles/public/cpu-32gb.yaml | 8 + .../profiles/public/single-gpu-16gb.yaml | 8 + .../profiles/public/single-gpu-24gb.yaml | 8 + .../profiles/public/single-gpu-2gb.yaml | 8 + .../profiles/public/single-gpu-4gb.yaml | 8 + .../profiles/public/single-gpu-6gb.yaml | 8 + .../profiles/public/single-gpu-8gb.yaml | 8 + .../resources/profiles/schema.py | 5 + tests/test_resources/test_ollama_adopt.py | 176 ++++++++++++++++++ 14 files changed, 288 insertions(+), 5 deletions(-) create mode 100644 tests/test_resources/test_ollama_adopt.py diff --git a/circuitforge_core/resources/agent/app.py b/circuitforge_core/resources/agent/app.py index 0045041..162e7c5 100644 --- a/circuitforge_core/resources/agent/app.py +++ b/circuitforge_core/resources/agent/app.py @@ -86,8 +86,12 @@ def create_agent_app( @app.post("/services/{service}/start") def start_service(service: str, req: ServiceStartRequest) -> dict: try: + already_running = service_manager.is_running(service) url = service_manager.start(service, req.gpu_id, req.params) - return {"service": service, "url": url, "running": True} + # adopted=True signals the coordinator to treat this instance as + # immediately running rather than waiting for the probe loop. + adopted = already_running and service_manager.is_running(service) + return {"service": service, "url": url, "running": True, "adopted": adopted} except (ValueError, NotImplementedError) as exc: raise HTTPException(status_code=422, detail=str(exc)) except Exception as exc: diff --git a/circuitforge_core/resources/agent/service_manager.py b/circuitforge_core/resources/agent/service_manager.py index 336330b..5578c24 100644 --- a/circuitforge_core/resources/agent/service_manager.py +++ b/circuitforge_core/resources/agent/service_manager.py @@ -67,6 +67,10 @@ class ServiceManager: except subprocess.CalledProcessError: return False if isinstance(spec, ProcessSpec): + # For adopt=True services, check the health endpoint regardless of whether + # we spawned the process (it may be a system daemon we didn't start). + if spec.adopt: + return self._probe_health(spec.host_port, spec.health_path) proc = self._procs.get(service) if proc is None or proc.poll() is not None: return False @@ -78,6 +82,16 @@ class ServiceManager: return False return False + def _probe_health(self, port: int, health_path: str = "/health") -> bool: + """Return True if the service at localhost:port responds 200 on health_path.""" + import urllib.request + try: + url = f"http://127.0.0.1:{port}{health_path}" + with urllib.request.urlopen(url, timeout=2.0) as resp: + return resp.status == 200 + except Exception: + return False + def start(self, service: str, gpu_id: int, params: dict[str, str]) -> str: spec = self._get_spec(service) if spec is None: @@ -111,7 +125,10 @@ class ServiceManager: return f"http://{self.advertise_host}:{spec.host_port}" if isinstance(spec, ProcessSpec): - import shlex + # adopt=True: if the service is already healthy, claim it without spawning. + if spec.adopt and self._probe_health(spec.host_port, spec.health_path): + return f"http://{self.advertise_host}:{spec.host_port}" + import subprocess as _sp filler = defaultdict(str, params) diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index f55cbb5..1051d7b 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -19,9 +19,19 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.node_selector import select_node from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry +from circuitforge_core.resources.profiles.schema import ProcessSpec _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() + +def _get_health_path(profile_registry: ProfileRegistry, service: str) -> str: + """Return the health_path for a service from the first matching profile spec.""" + for profile in profile_registry.list_public(): + svc = profile.services.get(service) + if svc and isinstance(svc.managed, ProcessSpec): + return svc.managed.health_path + return "/health" + _PROBE_INTERVAL_S = 5.0 # how often to poll starting instances _PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds @@ -49,7 +59,7 @@ async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None: if inst.url: try: with urllib.request.urlopen( - inst.url.rstrip("/") + "/health", timeout=2.0 + inst.url.rstrip("/") + inst.health_path, timeout=2.0 ) as resp: healthy = resp.status == 200 except Exception: @@ -381,8 +391,11 @@ def create_coordinator_app( url=svc_url, ttl_s=req.ttl_s, ) - # Seed the instance state for first-time starts - instance_state = "running" if warm else "starting" + # Seed the instance state for first-time starts. + # adopted=True means the agent found it already running. + adopted = data.get("adopted", False) + instance_state = "running" if (warm or adopted) else "starting" + health_path = _get_health_path(profile_registry, service) service_registry.upsert_instance( service=service, node_id=node_id, @@ -390,6 +403,7 @@ def create_coordinator_app( state=instance_state, model=model, url=svc_url, + health_path=health_path, ) return { "allocation_id": alloc.allocation_id, diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py index c258ad7..18c7b20 100644 --- a/circuitforge_core/resources/coordinator/service_registry.py +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -29,6 +29,7 @@ class ServiceInstance: model: str | None url: str | None idle_since: float | None = None + health_path: str = "/health" class ServiceRegistry: @@ -107,6 +108,7 @@ class ServiceRegistry: state: Literal["starting", "running", "idle", "stopped"], model: str | None, url: str | None, + health_path: str = "/health", ) -> ServiceInstance: key = f"{service}:{node_id}:{gpu_id}" existing = self._instances.get(key) @@ -117,6 +119,7 @@ class ServiceRegistry: inst = ServiceInstance( service=service, node_id=node_id, gpu_id=gpu_id, state=state, model=model, url=url, idle_since=idle_since, + health_path=health_path, ) self._instances[key] = inst return inst diff --git a/circuitforge_core/resources/profiles/public/cpu-16gb.yaml b/circuitforge_core/resources/profiles/public/cpu-16gb.yaml index 75a22e8..4ecec14 100644 --- a/circuitforge_core/resources/profiles/public/cpu-16gb.yaml +++ b/circuitforge_core/resources/profiles/public/cpu-16gb.yaml @@ -5,6 +5,14 @@ services: ollama: max_mb: 0 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-stt: max_mb: 0 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/cpu-32gb.yaml b/circuitforge_core/resources/profiles/public/cpu-32gb.yaml index fb7c976..1ae4299 100644 --- a/circuitforge_core/resources/profiles/public/cpu-32gb.yaml +++ b/circuitforge_core/resources/profiles/public/cpu-32gb.yaml @@ -5,6 +5,14 @@ services: ollama: max_mb: 0 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-stt: max_mb: 0 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml index 457c063..70d533d 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml @@ -17,6 +17,14 @@ services: ollama: max_mb: 12288 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 3072 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml index 5e933bc..073bf0e 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml @@ -17,6 +17,14 @@ services: ollama: max_mb: 18432 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 4096 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml index d852eea..b845dbc 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml @@ -6,6 +6,14 @@ services: ollama: max_mb: 1536 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 512 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml index b7cb24e..1bec3e3 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml @@ -6,6 +6,14 @@ services: ollama: max_mb: 3072 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 1024 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml index 1e8d6b3..3446e54 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml @@ -17,6 +17,14 @@ services: ollama: max_mb: 3584 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 1536 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml index ae3b170..23ab8d5 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml @@ -17,6 +17,14 @@ services: ollama: max_mb: 4096 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 2048 priority: 2 diff --git a/circuitforge_core/resources/profiles/schema.py b/circuitforge_core/resources/profiles/schema.py index 1ba8ee5..2667916 100644 --- a/circuitforge_core/resources/profiles/schema.py +++ b/circuitforge_core/resources/profiles/schema.py @@ -34,6 +34,11 @@ class ProcessSpec(BaseModel): env: dict[str, str] = Field(default_factory=dict) port: int = 0 host_port: int = 0 + # adopt=True: if the service is already listening on host_port, claim it rather + # than spawning a new process (useful for system daemons like Ollama). + adopt: bool = False + # Override the health probe path; defaults to /health (Ollama uses /api/tags). + health_path: str = "/health" model_config = {"frozen": True} diff --git a/tests/test_resources/test_ollama_adopt.py b/tests/test_resources/test_ollama_adopt.py new file mode 100644 index 0000000..ceaae12 --- /dev/null +++ b/tests/test_resources/test_ollama_adopt.py @@ -0,0 +1,176 @@ +# tests/test_resources/test_ollama_adopt.py +""" +Tests for the Ollama adopt-if-running path: + - ProcessSpec: adopt and health_path fields parsed from YAML + - ServiceManager.start(): adopt path claims running service; falls through if not running + - ServiceManager.is_running(): adopt path uses health probe, not proc table + - ServiceInstance.health_path persists through upsert_instance + - Probe loop uses inst.health_path instead of hardcoded /health +""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from circuitforge_core.resources.agent.service_manager import ServiceManager +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry +from circuitforge_core.resources.profiles.schema import GpuProfile, ProcessSpec, ServiceProfile, load_profile + + +# ── ProcessSpec schema ──────────────────────────────────────────────────────── + +def test_process_spec_defaults(): + spec = ProcessSpec(exec_path="/usr/local/bin/ollama") + assert spec.adopt is False + assert spec.health_path == "/health" + + +def test_process_spec_adopt_fields(): + spec = ProcessSpec( + exec_path="/usr/local/bin/ollama", + adopt=True, + health_path="/api/tags", + port=11434, + host_port=11434, + ) + assert spec.adopt is True + assert spec.health_path == "/api/tags" + + +def test_profile_yaml_parses_adopt(tmp_path: Path): + yaml_text = """\ +schema_version: 1 +name: test +services: + ollama: + max_mb: 4096 + priority: 1 + managed: + type: process + adopt: true + exec_path: /usr/local/bin/ollama + args_template: serve + port: 11434 + host_port: 11434 + health_path: /api/tags +""" + p = tmp_path / "profile.yaml" + p.write_text(yaml_text) + profile = load_profile(p) + spec = profile.services["ollama"].managed + assert isinstance(spec, ProcessSpec) + assert spec.adopt is True + assert spec.health_path == "/api/tags" + assert spec.host_port == 11434 + + +# ── ServiceManager adopt path ───────────────────────────────────────────────── + +def _make_manager_with_ollama(advertise_host: str = "127.0.0.1") -> ServiceManager: + profile = GpuProfile( + schema_version=1, + name="test", + services={ + "ollama": ServiceProfile( + max_mb=4096, + priority=1, + managed=ProcessSpec( + exec_path="/usr/local/bin/ollama", + args_template="serve", + port=11434, + host_port=11434, + adopt=True, + health_path="/api/tags", + ), + ) + }, + ) + return ServiceManager(node_id="heimdall", profile=profile, advertise_host=advertise_host) + + +def test_start_adopt_claims_running_service(): + """When Ollama is already healthy, start() returns its URL without spawning a process.""" + mgr = _make_manager_with_ollama() + with patch.object(mgr, "_probe_health", return_value=True) as mock_probe: + url = mgr.start("ollama", gpu_id=0, params={}) + assert url == "http://127.0.0.1:11434" + mock_probe.assert_called_once_with(11434, "/api/tags") + assert "ollama" not in mgr._procs # no subprocess spawned + + +def test_start_adopt_spawns_when_not_running(): + """When Ollama is not yet running, start() spawns it normally.""" + mgr = _make_manager_with_ollama() + mock_proc = MagicMock() + mock_proc.poll.return_value = None + + with patch.object(mgr, "_probe_health", return_value=False), \ + patch("subprocess.Popen", return_value=mock_proc) as mock_popen: + url = mgr.start("ollama", gpu_id=0, params={}) + + assert url == "http://127.0.0.1:11434" + mock_popen.assert_called_once() + assert "ollama" in mgr._procs + + +def test_is_running_adopt_uses_health_probe(): + """is_running() for adopt=True services checks the health endpoint, not the proc table.""" + mgr = _make_manager_with_ollama() + with patch.object(mgr, "_probe_health", return_value=True): + assert mgr.is_running("ollama") is True + with patch.object(mgr, "_probe_health", return_value=False): + assert mgr.is_running("ollama") is False + + +def test_probe_health_returns_true_on_200(): + mgr = _make_manager_with_ollama() + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.__enter__ = lambda s: mock_resp + mock_resp.__exit__ = MagicMock(return_value=False) + + with patch("urllib.request.urlopen", return_value=mock_resp): + assert mgr._probe_health(11434, "/api/tags") is True + + +def test_probe_health_returns_false_on_connection_error(): + mgr = _make_manager_with_ollama() + with patch("urllib.request.urlopen", side_effect=OSError("refused")): + assert mgr._probe_health(11434, "/api/tags") is False + + +# ── ServiceRegistry health_path ─────────────────────────────────────────────── + +def test_upsert_instance_stores_health_path(): + reg = ServiceRegistry() + inst = reg.upsert_instance( + service="ollama", node_id="heimdall", gpu_id=0, + state="running", model=None, url="http://127.0.0.1:11434", + health_path="/api/tags", + ) + assert inst.health_path == "/api/tags" + + +def test_upsert_instance_default_health_path(): + reg = ServiceRegistry() + inst = reg.upsert_instance( + service="vllm", node_id="heimdall", gpu_id=0, + state="starting", model="qwen", url="http://127.0.0.1:8000", + ) + assert inst.health_path == "/health" + + +def test_all_gpu_profiles_have_ollama_managed_block(): + """Sanity check: all public GPU profiles now have a managed block for ollama.""" + from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + registry = ProfileRegistry() + for profile in registry.list_public(): + svc = profile.services.get("ollama") + if svc is None: + continue # profile may not define ollama + assert svc.managed is not None, f"{profile.name}: ollama missing managed block" + assert isinstance(svc.managed, ProcessSpec) + assert svc.managed.adopt is True, f"{profile.name}: ollama adopt should be True" + assert svc.managed.health_path == "/api/tags", f"{profile.name}: wrong health_path"