feat: ollama adopt-if-running + health_path in ProcessSpec (#16)
- ProcessSpec: adopt (bool) and health_path (str, default /health) fields
- ServiceManager: adopt=True probes health_path before spawning; is_running()
uses health probe for adopt services rather than proc table + socket check
- _probe_health() helper: urllib GET on localhost:port+path, returns bool
- Agent /services/{service}/start: returns adopted=True when service was
already running; coordinator sets state=running immediately (no probe wait)
- ServiceInstance: health_path field (default /health)
- service_registry.upsert_instance(): health_path kwarg
- Probe loop uses inst.health_path instead of hardcoded /health
- coordinator allocate_service: looks up health_path from profile spec via
_get_health_path() and stores on ServiceInstance
- All GPU profiles (2/4/6/8/16/24 GB + cpu-16/32): ollama managed block
with adopt=true, health_path=/api/tags, port 11434
- 11 new tests
This commit is contained in:
parent
a54a530493
commit
7bb6b76bd5
14 changed files with 288 additions and 5 deletions
|
|
@ -86,8 +86,12 @@ def create_agent_app(
|
|||
@app.post("/services/{service}/start")
|
||||
def start_service(service: str, req: ServiceStartRequest) -> dict:
|
||||
try:
|
||||
already_running = service_manager.is_running(service)
|
||||
url = service_manager.start(service, req.gpu_id, req.params)
|
||||
return {"service": service, "url": url, "running": True}
|
||||
# adopted=True signals the coordinator to treat this instance as
|
||||
# immediately running rather than waiting for the probe loop.
|
||||
adopted = already_running and service_manager.is_running(service)
|
||||
return {"service": service, "url": url, "running": True, "adopted": adopted}
|
||||
except (ValueError, NotImplementedError) as exc:
|
||||
raise HTTPException(status_code=422, detail=str(exc))
|
||||
except Exception as exc:
|
||||
|
|
|
|||
|
|
@ -67,6 +67,10 @@ class ServiceManager:
|
|||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
if isinstance(spec, ProcessSpec):
|
||||
# For adopt=True services, check the health endpoint regardless of whether
|
||||
# we spawned the process (it may be a system daemon we didn't start).
|
||||
if spec.adopt:
|
||||
return self._probe_health(spec.host_port, spec.health_path)
|
||||
proc = self._procs.get(service)
|
||||
if proc is None or proc.poll() is not None:
|
||||
return False
|
||||
|
|
@ -78,6 +82,16 @@ class ServiceManager:
|
|||
return False
|
||||
return False
|
||||
|
||||
def _probe_health(self, port: int, health_path: str = "/health") -> bool:
|
||||
"""Return True if the service at localhost:port responds 200 on health_path."""
|
||||
import urllib.request
|
||||
try:
|
||||
url = f"http://127.0.0.1:{port}{health_path}"
|
||||
with urllib.request.urlopen(url, timeout=2.0) as resp:
|
||||
return resp.status == 200
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def start(self, service: str, gpu_id: int, params: dict[str, str]) -> str:
|
||||
spec = self._get_spec(service)
|
||||
if spec is None:
|
||||
|
|
@ -111,7 +125,10 @@ class ServiceManager:
|
|||
return f"http://{self.advertise_host}:{spec.host_port}"
|
||||
|
||||
if isinstance(spec, ProcessSpec):
|
||||
import shlex
|
||||
# adopt=True: if the service is already healthy, claim it without spawning.
|
||||
if spec.adopt and self._probe_health(spec.host_port, spec.health_path):
|
||||
return f"http://{self.advertise_host}:{spec.host_port}"
|
||||
|
||||
import subprocess as _sp
|
||||
|
||||
filler = defaultdict(str, params)
|
||||
|
|
|
|||
|
|
@ -19,9 +19,19 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
|
|||
from circuitforge_core.resources.coordinator.node_selector import select_node
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
|
||||
from circuitforge_core.resources.profiles.schema import ProcessSpec
|
||||
|
||||
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
|
||||
|
||||
|
||||
def _get_health_path(profile_registry: ProfileRegistry, service: str) -> str:
|
||||
"""Return the health_path for a service from the first matching profile spec."""
|
||||
for profile in profile_registry.list_public():
|
||||
svc = profile.services.get(service)
|
||||
if svc and isinstance(svc.managed, ProcessSpec):
|
||||
return svc.managed.health_path
|
||||
return "/health"
|
||||
|
||||
_PROBE_INTERVAL_S = 5.0 # how often to poll starting instances
|
||||
_PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds
|
||||
|
||||
|
|
@ -49,7 +59,7 @@ async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None:
|
|||
if inst.url:
|
||||
try:
|
||||
with urllib.request.urlopen(
|
||||
inst.url.rstrip("/") + "/health", timeout=2.0
|
||||
inst.url.rstrip("/") + inst.health_path, timeout=2.0
|
||||
) as resp:
|
||||
healthy = resp.status == 200
|
||||
except Exception:
|
||||
|
|
@ -381,8 +391,11 @@ def create_coordinator_app(
|
|||
url=svc_url,
|
||||
ttl_s=req.ttl_s,
|
||||
)
|
||||
# Seed the instance state for first-time starts
|
||||
instance_state = "running" if warm else "starting"
|
||||
# Seed the instance state for first-time starts.
|
||||
# adopted=True means the agent found it already running.
|
||||
adopted = data.get("adopted", False)
|
||||
instance_state = "running" if (warm or adopted) else "starting"
|
||||
health_path = _get_health_path(profile_registry, service)
|
||||
service_registry.upsert_instance(
|
||||
service=service,
|
||||
node_id=node_id,
|
||||
|
|
@ -390,6 +403,7 @@ def create_coordinator_app(
|
|||
state=instance_state,
|
||||
model=model,
|
||||
url=svc_url,
|
||||
health_path=health_path,
|
||||
)
|
||||
return {
|
||||
"allocation_id": alloc.allocation_id,
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ class ServiceInstance:
|
|||
model: str | None
|
||||
url: str | None
|
||||
idle_since: float | None = None
|
||||
health_path: str = "/health"
|
||||
|
||||
|
||||
class ServiceRegistry:
|
||||
|
|
@ -107,6 +108,7 @@ class ServiceRegistry:
|
|||
state: Literal["starting", "running", "idle", "stopped"],
|
||||
model: str | None,
|
||||
url: str | None,
|
||||
health_path: str = "/health",
|
||||
) -> ServiceInstance:
|
||||
key = f"{service}:{node_id}:{gpu_id}"
|
||||
existing = self._instances.get(key)
|
||||
|
|
@ -117,6 +119,7 @@ class ServiceRegistry:
|
|||
inst = ServiceInstance(
|
||||
service=service, node_id=node_id, gpu_id=gpu_id,
|
||||
state=state, model=model, url=url, idle_since=idle_since,
|
||||
health_path=health_path,
|
||||
)
|
||||
self._instances[key] = inst
|
||||
return inst
|
||||
|
|
|
|||
|
|
@ -5,6 +5,14 @@ services:
|
|||
ollama:
|
||||
max_mb: 0
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: "/usr/local/bin/ollama"
|
||||
args_template: "serve"
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
cf-stt:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -5,6 +5,14 @@ services:
|
|||
ollama:
|
||||
max_mb: 0
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: "/usr/local/bin/ollama"
|
||||
args_template: "serve"
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
cf-stt:
|
||||
max_mb: 0
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -17,6 +17,14 @@ services:
|
|||
ollama:
|
||||
max_mb: 12288
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: "/usr/local/bin/ollama"
|
||||
args_template: "serve"
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
cf-vision:
|
||||
max_mb: 3072
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -17,6 +17,14 @@ services:
|
|||
ollama:
|
||||
max_mb: 18432
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: "/usr/local/bin/ollama"
|
||||
args_template: "serve"
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
cf-vision:
|
||||
max_mb: 4096
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -6,6 +6,14 @@ services:
|
|||
ollama:
|
||||
max_mb: 1536
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: "/usr/local/bin/ollama"
|
||||
args_template: "serve"
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
cf-vision:
|
||||
max_mb: 512
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -6,6 +6,14 @@ services:
|
|||
ollama:
|
||||
max_mb: 3072
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: "/usr/local/bin/ollama"
|
||||
args_template: "serve"
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
cf-vision:
|
||||
max_mb: 1024
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -17,6 +17,14 @@ services:
|
|||
ollama:
|
||||
max_mb: 3584
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: "/usr/local/bin/ollama"
|
||||
args_template: "serve"
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
cf-vision:
|
||||
max_mb: 1536
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -17,6 +17,14 @@ services:
|
|||
ollama:
|
||||
max_mb: 4096
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: "/usr/local/bin/ollama"
|
||||
args_template: "serve"
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
cf-vision:
|
||||
max_mb: 2048
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -34,6 +34,11 @@ class ProcessSpec(BaseModel):
|
|||
env: dict[str, str] = Field(default_factory=dict)
|
||||
port: int = 0
|
||||
host_port: int = 0
|
||||
# adopt=True: if the service is already listening on host_port, claim it rather
|
||||
# than spawning a new process (useful for system daemons like Ollama).
|
||||
adopt: bool = False
|
||||
# Override the health probe path; defaults to /health (Ollama uses /api/tags).
|
||||
health_path: str = "/health"
|
||||
|
||||
model_config = {"frozen": True}
|
||||
|
||||
|
|
|
|||
176
tests/test_resources/test_ollama_adopt.py
Normal file
176
tests/test_resources/test_ollama_adopt.py
Normal file
|
|
@ -0,0 +1,176 @@
|
|||
# tests/test_resources/test_ollama_adopt.py
|
||||
"""
|
||||
Tests for the Ollama adopt-if-running path:
|
||||
- ProcessSpec: adopt and health_path fields parsed from YAML
|
||||
- ServiceManager.start(): adopt path claims running service; falls through if not running
|
||||
- ServiceManager.is_running(): adopt path uses health probe, not proc table
|
||||
- ServiceInstance.health_path persists through upsert_instance
|
||||
- Probe loop uses inst.health_path instead of hardcoded /health
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from circuitforge_core.resources.agent.service_manager import ServiceManager
|
||||
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
|
||||
from circuitforge_core.resources.profiles.schema import GpuProfile, ProcessSpec, ServiceProfile, load_profile
|
||||
|
||||
|
||||
# ── ProcessSpec schema ────────────────────────────────────────────────────────
|
||||
|
||||
def test_process_spec_defaults():
|
||||
spec = ProcessSpec(exec_path="/usr/local/bin/ollama")
|
||||
assert spec.adopt is False
|
||||
assert spec.health_path == "/health"
|
||||
|
||||
|
||||
def test_process_spec_adopt_fields():
|
||||
spec = ProcessSpec(
|
||||
exec_path="/usr/local/bin/ollama",
|
||||
adopt=True,
|
||||
health_path="/api/tags",
|
||||
port=11434,
|
||||
host_port=11434,
|
||||
)
|
||||
assert spec.adopt is True
|
||||
assert spec.health_path == "/api/tags"
|
||||
|
||||
|
||||
def test_profile_yaml_parses_adopt(tmp_path: Path):
|
||||
yaml_text = """\
|
||||
schema_version: 1
|
||||
name: test
|
||||
services:
|
||||
ollama:
|
||||
max_mb: 4096
|
||||
priority: 1
|
||||
managed:
|
||||
type: process
|
||||
adopt: true
|
||||
exec_path: /usr/local/bin/ollama
|
||||
args_template: serve
|
||||
port: 11434
|
||||
host_port: 11434
|
||||
health_path: /api/tags
|
||||
"""
|
||||
p = tmp_path / "profile.yaml"
|
||||
p.write_text(yaml_text)
|
||||
profile = load_profile(p)
|
||||
spec = profile.services["ollama"].managed
|
||||
assert isinstance(spec, ProcessSpec)
|
||||
assert spec.adopt is True
|
||||
assert spec.health_path == "/api/tags"
|
||||
assert spec.host_port == 11434
|
||||
|
||||
|
||||
# ── ServiceManager adopt path ─────────────────────────────────────────────────
|
||||
|
||||
def _make_manager_with_ollama(advertise_host: str = "127.0.0.1") -> ServiceManager:
|
||||
profile = GpuProfile(
|
||||
schema_version=1,
|
||||
name="test",
|
||||
services={
|
||||
"ollama": ServiceProfile(
|
||||
max_mb=4096,
|
||||
priority=1,
|
||||
managed=ProcessSpec(
|
||||
exec_path="/usr/local/bin/ollama",
|
||||
args_template="serve",
|
||||
port=11434,
|
||||
host_port=11434,
|
||||
adopt=True,
|
||||
health_path="/api/tags",
|
||||
),
|
||||
)
|
||||
},
|
||||
)
|
||||
return ServiceManager(node_id="heimdall", profile=profile, advertise_host=advertise_host)
|
||||
|
||||
|
||||
def test_start_adopt_claims_running_service():
|
||||
"""When Ollama is already healthy, start() returns its URL without spawning a process."""
|
||||
mgr = _make_manager_with_ollama()
|
||||
with patch.object(mgr, "_probe_health", return_value=True) as mock_probe:
|
||||
url = mgr.start("ollama", gpu_id=0, params={})
|
||||
assert url == "http://127.0.0.1:11434"
|
||||
mock_probe.assert_called_once_with(11434, "/api/tags")
|
||||
assert "ollama" not in mgr._procs # no subprocess spawned
|
||||
|
||||
|
||||
def test_start_adopt_spawns_when_not_running():
|
||||
"""When Ollama is not yet running, start() spawns it normally."""
|
||||
mgr = _make_manager_with_ollama()
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.poll.return_value = None
|
||||
|
||||
with patch.object(mgr, "_probe_health", return_value=False), \
|
||||
patch("subprocess.Popen", return_value=mock_proc) as mock_popen:
|
||||
url = mgr.start("ollama", gpu_id=0, params={})
|
||||
|
||||
assert url == "http://127.0.0.1:11434"
|
||||
mock_popen.assert_called_once()
|
||||
assert "ollama" in mgr._procs
|
||||
|
||||
|
||||
def test_is_running_adopt_uses_health_probe():
|
||||
"""is_running() for adopt=True services checks the health endpoint, not the proc table."""
|
||||
mgr = _make_manager_with_ollama()
|
||||
with patch.object(mgr, "_probe_health", return_value=True):
|
||||
assert mgr.is_running("ollama") is True
|
||||
with patch.object(mgr, "_probe_health", return_value=False):
|
||||
assert mgr.is_running("ollama") is False
|
||||
|
||||
|
||||
def test_probe_health_returns_true_on_200():
|
||||
mgr = _make_manager_with_ollama()
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status = 200
|
||||
mock_resp.__enter__ = lambda s: mock_resp
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=mock_resp):
|
||||
assert mgr._probe_health(11434, "/api/tags") is True
|
||||
|
||||
|
||||
def test_probe_health_returns_false_on_connection_error():
|
||||
mgr = _make_manager_with_ollama()
|
||||
with patch("urllib.request.urlopen", side_effect=OSError("refused")):
|
||||
assert mgr._probe_health(11434, "/api/tags") is False
|
||||
|
||||
|
||||
# ── ServiceRegistry health_path ───────────────────────────────────────────────
|
||||
|
||||
def test_upsert_instance_stores_health_path():
|
||||
reg = ServiceRegistry()
|
||||
inst = reg.upsert_instance(
|
||||
service="ollama", node_id="heimdall", gpu_id=0,
|
||||
state="running", model=None, url="http://127.0.0.1:11434",
|
||||
health_path="/api/tags",
|
||||
)
|
||||
assert inst.health_path == "/api/tags"
|
||||
|
||||
|
||||
def test_upsert_instance_default_health_path():
|
||||
reg = ServiceRegistry()
|
||||
inst = reg.upsert_instance(
|
||||
service="vllm", node_id="heimdall", gpu_id=0,
|
||||
state="starting", model="qwen", url="http://127.0.0.1:8000",
|
||||
)
|
||||
assert inst.health_path == "/health"
|
||||
|
||||
|
||||
def test_all_gpu_profiles_have_ollama_managed_block():
|
||||
"""Sanity check: all public GPU profiles now have a managed block for ollama."""
|
||||
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
|
||||
registry = ProfileRegistry()
|
||||
for profile in registry.list_public():
|
||||
svc = profile.services.get("ollama")
|
||||
if svc is None:
|
||||
continue # profile may not define ollama
|
||||
assert svc.managed is not None, f"{profile.name}: ollama missing managed block"
|
||||
assert isinstance(svc.managed, ProcessSpec)
|
||||
assert svc.managed.adopt is True, f"{profile.name}: ollama adopt should be True"
|
||||
assert svc.managed.health_path == "/api/tags", f"{profile.name}: wrong health_path"
|
||||
Loading…
Reference in a new issue