Merge pull request 'feat: agent watchdog + Ollama adopt-if-running' (#17) from feature/agent-watchdog into main

This commit is contained in:
pyr0ball 2026-04-02 22:12:32 -07:00
commit d45d4e1de6
19 changed files with 684 additions and 25 deletions

View file

@ -86,8 +86,12 @@ def create_agent_app(
@app.post("/services/{service}/start")
def start_service(service: str, req: ServiceStartRequest) -> dict:
try:
already_running = service_manager.is_running(service)
url = service_manager.start(service, req.gpu_id, req.params)
return {"service": service, "url": url, "running": True}
# adopted=True signals the coordinator to treat this instance as
# immediately running rather than waiting for the probe loop.
adopted = already_running and service_manager.is_running(service)
return {"service": service, "url": url, "running": True, "adopted": adopted}
except (ValueError, NotImplementedError) as exc:
raise HTTPException(status_code=422, detail=str(exc))
except Exception as exc:

View file

@ -67,6 +67,10 @@ class ServiceManager:
except subprocess.CalledProcessError:
return False
if isinstance(spec, ProcessSpec):
# For adopt=True services, check the health endpoint regardless of whether
# we spawned the process (it may be a system daemon we didn't start).
if spec.adopt:
return self._probe_health(spec.host_port, spec.health_path)
proc = self._procs.get(service)
if proc is None or proc.poll() is not None:
return False
@ -78,6 +82,16 @@ class ServiceManager:
return False
return False
def _probe_health(self, port: int, health_path: str = "/health") -> bool:
"""Return True if the service at localhost:port responds 200 on health_path."""
import urllib.request
try:
url = f"http://127.0.0.1:{port}{health_path}"
with urllib.request.urlopen(url, timeout=2.0) as resp:
return resp.status == 200
except Exception:
return False
def start(self, service: str, gpu_id: int, params: dict[str, str]) -> str:
spec = self._get_spec(service)
if spec is None:
@ -111,7 +125,10 @@ class ServiceManager:
return f"http://{self.advertise_host}:{spec.host_port}"
if isinstance(spec, ProcessSpec):
import shlex
# adopt=True: if the service is already healthy, claim it without spawning.
if spec.adopt and self._probe_health(spec.host_port, spec.health_path):
return f"http://{self.advertise_host}:{spec.host_port}"
import subprocess as _sp
filler = defaultdict(str, params)

View file

@ -1,5 +1,6 @@
from __future__ import annotations
import logging
import sys
from pathlib import Path
from typing import Annotated, Optional
@ -7,6 +8,8 @@ from typing import Annotated, Optional
import typer
import uvicorn
logger = logging.getLogger(__name__)
app = typer.Typer(name="cf-orch", help="CircuitForge GPU resource orchestrator")
_SYSTEMD_UNIT_PATH = Path("/etc/systemd/system/cf-orch.service")
@ -47,14 +50,21 @@ def start(
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor
from circuitforge_core.resources.coordinator.node_store import NodeStore
lease_manager = LeaseManager()
profile_registry = ProfileRegistry()
service_registry = ServiceRegistry()
node_store = NodeStore()
supervisor = AgentSupervisor(
lease_manager=lease_manager,
service_registry=service_registry,
profile_registry=profile_registry,
node_store=node_store,
)
restored = supervisor.restore_from_store()
if restored:
typer.echo(f"Restored {restored} known node(s) from previous session")
monitor = GpuMonitor()
gpus = monitor.poll()
@ -119,27 +129,43 @@ def agent(
reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host)
agent_url = f"http://{reach_host}:{port}"
def _register_in_background() -> None:
"""POST registration to coordinator after a short delay (uvicorn needs ~1s to bind)."""
import time
time.sleep(2.0)
try:
resp = httpx.post(
f"{coordinator}/api/nodes",
json={"node_id": node_id, "agent_url": agent_url},
timeout=5.0,
)
if resp.is_success:
typer.echo(f"Registered with coordinator at {coordinator} as '{node_id}'")
else:
typer.echo(
f"Warning: coordinator registration returned {resp.status_code}", err=True
)
except Exception as exc:
typer.echo(f"Warning: could not reach coordinator at {coordinator}: {exc}", err=True)
_RECONNECT_INTERVAL_S = 30.0
# Fire registration in a daemon thread so uvicorn.run() can start blocking immediately.
threading.Thread(target=_register_in_background, daemon=True).start()
def _reconnect_loop() -> None:
"""
Persistently re-register this agent with the coordinator.
Runs as a daemon thread for the lifetime of the agent process:
- Waits 2 s on first run (uvicorn needs time to bind)
- Re-registers every 30 s thereafter
- If the coordinator is down, silently retries no crashing
- When the coordinator restarts, the agent re-appears within one cycle
This means coordinator restarts require no manual intervention on agent hosts.
"""
import time
first = True
while True:
time.sleep(2.0 if first else _RECONNECT_INTERVAL_S)
first = False
try:
resp = httpx.post(
f"{coordinator}/api/nodes",
json={"node_id": node_id, "agent_url": agent_url},
timeout=5.0,
)
if resp.is_success:
logger.debug("Registered with coordinator at %s as '%s'", coordinator, node_id)
else:
logger.warning(
"Coordinator registration returned %s", resp.status_code
)
except Exception as exc:
logger.debug("Coordinator at %s unreachable, will retry: %s", coordinator, exc)
# Fire reconnect loop in a daemon thread so uvicorn.run() can start blocking immediately.
threading.Thread(target=_reconnect_loop, daemon=True, name="cf-orch-reconnect").start()
typer.echo(f"Reconnect loop started — will register with {coordinator} every {int(_RECONNECT_INTERVAL_S)}s")
service_manager = None
try:

View file

@ -8,6 +8,7 @@ from dataclasses import dataclass, field
import httpx
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.node_store import NodeStore
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation
@ -33,14 +34,38 @@ class AgentSupervisor:
lease_manager: LeaseManager,
service_registry: ServiceRegistry | None = None,
profile_registry: ProfileRegistry | None = None,
node_store: NodeStore | None = None,
) -> None:
self._agents: dict[str, AgentRecord] = {}
self._lease_manager = lease_manager
self._running = False
self._service_registry = service_registry
self._profile_registry = profile_registry
self._node_store = node_store
self._heartbeat_tick = 0
def restore_from_store(self) -> int:
"""
Load previously-known nodes from NodeStore into the in-memory registry.
All restored nodes start as offline=False. The heartbeat loop will poll
them on its first tick and promote any that respond to online=True.
Returns the number of nodes restored.
"""
if self._node_store is None:
return 0
restored = 0
for node_id, agent_url in self._node_store.all():
if node_id not in self._agents:
self._agents[node_id] = AgentRecord(
node_id=node_id, agent_url=agent_url, online=False
)
restored += 1
if restored:
logger.info("NodeStore: restored %d known node(s) from previous session", restored)
return restored
def register(self, node_id: str, agent_url: str) -> None:
if node_id not in self._agents:
self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url)
@ -49,6 +74,8 @@ class AgentSupervisor:
if self._agents[node_id].agent_url != agent_url:
self._agents[node_id].agent_url = agent_url
logger.info("Updated agent URL for %s%s", node_id, agent_url)
if self._node_store is not None:
self._node_store.upsert(node_id, agent_url)
def get_node_info(self, node_id: str) -> NodeInfo | None:
record = self._agents.get(node_id)

View file

@ -19,9 +19,19 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.node_selector import select_node
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.profiles.schema import ProcessSpec
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
def _get_health_path(profile_registry: ProfileRegistry, service: str) -> str:
"""Return the health_path for a service from the first matching profile spec."""
for profile in profile_registry.list_public():
svc = profile.services.get(service)
if svc and isinstance(svc.managed, ProcessSpec):
return svc.managed.health_path
return "/health"
_PROBE_INTERVAL_S = 5.0 # how often to poll starting instances
_PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds
@ -49,7 +59,7 @@ async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None:
if inst.url:
try:
with urllib.request.urlopen(
inst.url.rstrip("/") + "/health", timeout=2.0
inst.url.rstrip("/") + inst.health_path, timeout=2.0
) as resp:
healthy = resp.status == 200
except Exception:
@ -381,8 +391,11 @@ def create_coordinator_app(
url=svc_url,
ttl_s=req.ttl_s,
)
# Seed the instance state for first-time starts
instance_state = "running" if warm else "starting"
# Seed the instance state for first-time starts.
# adopted=True means the agent found it already running.
adopted = data.get("adopted", False)
instance_state = "running" if (warm or adopted) else "starting"
health_path = _get_health_path(profile_registry, service)
service_registry.upsert_instance(
service=service,
node_id=node_id,
@ -390,6 +403,7 @@ def create_coordinator_app(
state=instance_state,
model=model,
url=svc_url,
health_path=health_path,
)
return {
"allocation_id": alloc.allocation_id,

View file

@ -0,0 +1,85 @@
"""
circuitforge_core.resources.coordinator.node_store SQLite persistence for known agent nodes.
Gives the coordinator restart-safe memory of which nodes have ever registered.
On startup the coordinator reloads all known nodes and immediately probes them;
nodes that respond come back online within one heartbeat cycle (~10 s) without
any manual intervention on the agent hosts.
"""
from __future__ import annotations
import logging
import sqlite3
import time
from pathlib import Path
logger = logging.getLogger(__name__)
_DEFAULT_DB_PATH = Path.home() / ".local" / "share" / "circuitforge" / "cf-orch-nodes.db"
_STALE_AGE_DAYS = 30 # nodes unseen for this long are pruned automatically
class NodeStore:
"""
Thin SQLite wrapper for persisting known agent nodes across coordinator restarts.
Thread-safe for single-writer use (coordinator runs in one asyncio thread).
"""
def __init__(self, db_path: Path = _DEFAULT_DB_PATH) -> None:
self.db_path = db_path
db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(str(db_path), check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._migrate()
logger.debug("NodeStore initialised at %s", db_path)
def _migrate(self) -> None:
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS known_nodes (
node_id TEXT PRIMARY KEY,
agent_url TEXT NOT NULL,
last_seen REAL NOT NULL
);
""")
self._conn.commit()
def upsert(self, node_id: str, agent_url: str) -> None:
"""Record or update a node. Called on every successful registration."""
self._conn.execute(
"""
INSERT INTO known_nodes (node_id, agent_url, last_seen)
VALUES (?, ?, ?)
ON CONFLICT(node_id) DO UPDATE SET
agent_url = excluded.agent_url,
last_seen = excluded.last_seen
""",
(node_id, agent_url, time.time()),
)
self._conn.commit()
def all(self) -> list[tuple[str, str]]:
"""Return all known (node_id, agent_url) pairs."""
rows = self._conn.execute(
"SELECT node_id, agent_url FROM known_nodes ORDER BY last_seen DESC"
).fetchall()
return [(r["node_id"], r["agent_url"]) for r in rows]
def remove(self, node_id: str) -> None:
self._conn.execute("DELETE FROM known_nodes WHERE node_id = ?", (node_id,))
self._conn.commit()
def prune_stale(self, max_age_days: int = _STALE_AGE_DAYS) -> int:
"""Delete nodes not seen within max_age_days. Returns count removed."""
cutoff = time.time() - max_age_days * 86400
cur = self._conn.execute(
"DELETE FROM known_nodes WHERE last_seen < ?", (cutoff,)
)
self._conn.commit()
removed = cur.rowcount
if removed:
logger.info("NodeStore: pruned %d stale node(s) (>%d days old)", removed, max_age_days)
return removed
def close(self) -> None:
self._conn.close()

View file

@ -29,6 +29,7 @@ class ServiceInstance:
model: str | None
url: str | None
idle_since: float | None = None
health_path: str = "/health"
class ServiceRegistry:
@ -107,6 +108,7 @@ class ServiceRegistry:
state: Literal["starting", "running", "idle", "stopped"],
model: str | None,
url: str | None,
health_path: str = "/health",
) -> ServiceInstance:
key = f"{service}:{node_id}:{gpu_id}"
existing = self._instances.get(key)
@ -117,6 +119,7 @@ class ServiceRegistry:
inst = ServiceInstance(
service=service, node_id=node_id, gpu_id=gpu_id,
state=state, model=model, url=url, idle_since=idle_since,
health_path=health_path,
)
self._instances[key] = inst
return inst

View file

@ -5,6 +5,14 @@ services:
ollama:
max_mb: 0
priority: 1
managed:
type: process
adopt: true
exec_path: "/usr/local/bin/ollama"
args_template: "serve"
port: 11434
host_port: 11434
health_path: /api/tags
cf-stt:
max_mb: 0
priority: 2

View file

@ -5,6 +5,14 @@ services:
ollama:
max_mb: 0
priority: 1
managed:
type: process
adopt: true
exec_path: "/usr/local/bin/ollama"
args_template: "serve"
port: 11434
host_port: 11434
health_path: /api/tags
cf-stt:
max_mb: 0
priority: 2

View file

@ -17,6 +17,14 @@ services:
ollama:
max_mb: 12288
priority: 1
managed:
type: process
adopt: true
exec_path: "/usr/local/bin/ollama"
args_template: "serve"
port: 11434
host_port: 11434
health_path: /api/tags
cf-vision:
max_mb: 3072
priority: 2

View file

@ -17,6 +17,14 @@ services:
ollama:
max_mb: 18432
priority: 1
managed:
type: process
adopt: true
exec_path: "/usr/local/bin/ollama"
args_template: "serve"
port: 11434
host_port: 11434
health_path: /api/tags
cf-vision:
max_mb: 4096
priority: 2

View file

@ -6,6 +6,14 @@ services:
ollama:
max_mb: 1536
priority: 1
managed:
type: process
adopt: true
exec_path: "/usr/local/bin/ollama"
args_template: "serve"
port: 11434
host_port: 11434
health_path: /api/tags
cf-vision:
max_mb: 512
priority: 2

View file

@ -6,6 +6,14 @@ services:
ollama:
max_mb: 3072
priority: 1
managed:
type: process
adopt: true
exec_path: "/usr/local/bin/ollama"
args_template: "serve"
port: 11434
host_port: 11434
health_path: /api/tags
cf-vision:
max_mb: 1024
priority: 2

View file

@ -17,6 +17,14 @@ services:
ollama:
max_mb: 3584
priority: 1
managed:
type: process
adopt: true
exec_path: "/usr/local/bin/ollama"
args_template: "serve"
port: 11434
host_port: 11434
health_path: /api/tags
cf-vision:
max_mb: 1536
priority: 2

View file

@ -17,6 +17,14 @@ services:
ollama:
max_mb: 4096
priority: 1
managed:
type: process
adopt: true
exec_path: "/usr/local/bin/ollama"
args_template: "serve"
port: 11434
host_port: 11434
health_path: /api/tags
cf-vision:
max_mb: 2048
priority: 2

View file

@ -34,6 +34,11 @@ class ProcessSpec(BaseModel):
env: dict[str, str] = Field(default_factory=dict)
port: int = 0
host_port: int = 0
# adopt=True: if the service is already listening on host_port, claim it rather
# than spawning a new process (useful for system daemons like Ollama).
adopt: bool = False
# Override the health probe path; defaults to /health (Ollama uses /api/tags).
health_path: str = "/health"
model_config = {"frozen": True}

View file

@ -0,0 +1,151 @@
# tests/test_resources/test_agent_watchdog.py
"""
Tests for AgentSupervisor watchdog behaviour:
- restore_from_store() reloads known nodes from NodeStore on startup
- register() persists to NodeStore
- restored nodes start offline and come online after a successful poll
- NodeStore=None path is a no-op (backwards compatibility)
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.node_store import NodeStore
# ── fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def store(tmp_path: Path) -> NodeStore:
return NodeStore(db_path=tmp_path / "nodes.db")
@pytest.fixture
def supervisor(store: NodeStore) -> AgentSupervisor:
return AgentSupervisor(lease_manager=LeaseManager(), node_store=store)
@pytest.fixture
def supervisor_no_store() -> AgentSupervisor:
return AgentSupervisor(lease_manager=LeaseManager(), node_store=None)
# ── register() persists ───────────────────────────────────────────────────────
def test_register_persists_to_store(supervisor: AgentSupervisor, store: NodeStore) -> None:
supervisor.register("heimdall", "http://127.0.0.1:7701")
rows = store.all()
assert len(rows) == 1
assert rows[0] == ("heimdall", "http://127.0.0.1:7701")
def test_register_updates_url_in_store(supervisor: AgentSupervisor, store: NodeStore) -> None:
supervisor.register("navi", "http://10.1.10.10:7701")
supervisor.register("navi", "http://10.1.10.10:9999")
rows = store.all()
assert len(rows) == 1
assert rows[0][1] == "http://10.1.10.10:9999"
def test_register_without_store_does_not_crash(supervisor_no_store: AgentSupervisor) -> None:
supervisor_no_store.register("heimdall", "http://127.0.0.1:7701")
assert supervisor_no_store.get_node_info("heimdall") is not None
# ── restore_from_store() ──────────────────────────────────────────────────────
def test_restore_loads_known_nodes(tmp_path: Path) -> None:
"""Nodes written by a previous supervisor session are restored into a fresh one."""
db = tmp_path / "nodes.db"
# Session 1: register two nodes
s1 = NodeStore(db_path=db)
sup1 = AgentSupervisor(lease_manager=LeaseManager(), node_store=s1)
sup1.register("navi", "http://10.1.10.10:7701")
sup1.register("strahl", "http://10.1.10.20:7701")
# Session 2: fresh supervisor, same DB
s2 = NodeStore(db_path=db)
sup2 = AgentSupervisor(lease_manager=LeaseManager(), node_store=s2)
restored = sup2.restore_from_store()
assert restored == 2
assert sup2.get_node_info("navi") is not None
assert sup2.get_node_info("strahl") is not None
def test_restore_marks_nodes_offline(tmp_path: Path) -> None:
"""Restored nodes start offline — they haven't been polled yet."""
db = tmp_path / "nodes.db"
s1 = NodeStore(db_path=db)
AgentSupervisor(lease_manager=LeaseManager(), node_store=s1).register(
"navi", "http://10.1.10.10:7701"
)
s2 = NodeStore(db_path=db)
sup2 = AgentSupervisor(lease_manager=LeaseManager(), node_store=s2)
sup2.restore_from_store()
assert sup2.online_agents() == {}
def test_restore_returns_zero_without_store() -> None:
sup = AgentSupervisor(lease_manager=LeaseManager(), node_store=None)
assert sup.restore_from_store() == 0
def test_restore_skips_already_registered(tmp_path: Path) -> None:
"""Nodes manually registered before restore_from_store() are not duplicated."""
db = tmp_path / "nodes.db"
store = NodeStore(db_path=db)
store.upsert("heimdall", "http://127.0.0.1:7701")
sup = AgentSupervisor(lease_manager=LeaseManager(), node_store=store)
sup.register("heimdall", "http://127.0.0.1:7701") # already in memory
restored = sup.restore_from_store()
assert restored == 0 # already present, not double-counted
# ── restored node comes online after poll ─────────────────────────────────────
@pytest.mark.asyncio
async def test_restored_node_comes_online_after_poll(tmp_path: Path) -> None:
"""After restore, a successful poll_agent() brings the node online."""
db = tmp_path / "nodes.db"
store = NodeStore(db_path=db)
store.upsert("navi", "http://10.1.10.10:7701")
sup = AgentSupervisor(lease_manager=LeaseManager(), node_store=store)
sup.restore_from_store()
# Stub poll_agent to succeed
gpu_payload = {"gpus": [{"gpu_id": 0, "name": "RTX 4000",
"vram_total_mb": 8192, "vram_used_mb": 512, "vram_free_mb": 7680}]}
resident_payload = {"residents": []}
mock_resp_gpu = MagicMock()
mock_resp_gpu.raise_for_status = MagicMock()
mock_resp_gpu.json.return_value = gpu_payload
mock_resp_res = MagicMock()
mock_resp_res.is_success = True
mock_resp_res.json.return_value = resident_payload
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=[mock_resp_gpu, mock_resp_res])
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("circuitforge_core.resources.coordinator.agent_supervisor.httpx.AsyncClient",
return_value=mock_client):
result = await sup.poll_agent("navi")
assert result is True
assert "navi" in sup.online_agents()

View file

@ -0,0 +1,87 @@
# tests/test_resources/test_node_store.py
"""Unit tests for NodeStore — SQLite persistence layer for known agent nodes."""
from __future__ import annotations
import time
from pathlib import Path
import pytest
from circuitforge_core.resources.coordinator.node_store import NodeStore
@pytest.fixture
def store(tmp_path: Path) -> NodeStore:
return NodeStore(db_path=tmp_path / "test-nodes.db")
def test_upsert_and_all(store: NodeStore) -> None:
store.upsert("heimdall", "http://127.0.0.1:7701")
rows = store.all()
assert len(rows) == 1
assert rows[0] == ("heimdall", "http://127.0.0.1:7701")
def test_upsert_updates_url(store: NodeStore) -> None:
store.upsert("navi", "http://10.1.10.10:7701")
store.upsert("navi", "http://10.1.10.10:7702")
rows = store.all()
assert len(rows) == 1
assert rows[0][1] == "http://10.1.10.10:7702"
def test_multiple_nodes(store: NodeStore) -> None:
store.upsert("heimdall", "http://127.0.0.1:7701")
store.upsert("navi", "http://10.1.10.10:7701")
store.upsert("strahl", "http://10.1.10.20:7701")
assert len(store.all()) == 3
def test_remove(store: NodeStore) -> None:
store.upsert("heimdall", "http://127.0.0.1:7701")
store.upsert("navi", "http://10.1.10.10:7701")
store.remove("navi")
ids = [r[0] for r in store.all()]
assert "navi" not in ids
assert "heimdall" in ids
def test_prune_stale_removes_old_entries(store: NodeStore) -> None:
# Insert a node with a last_seen in the distant past
store._conn.execute(
"INSERT INTO known_nodes (node_id, agent_url, last_seen) VALUES (?, ?, ?)",
("ghost", "http://dead:7701", time.time() - 40 * 86400),
)
store._conn.commit()
store.upsert("live", "http://live:7701")
removed = store.prune_stale(max_age_days=30)
assert removed == 1
ids = [r[0] for r in store.all()]
assert "ghost" not in ids
assert "live" in ids
def test_prune_stale_keeps_recent(store: NodeStore) -> None:
store.upsert("recent", "http://recent:7701")
removed = store.prune_stale(max_age_days=30)
assert removed == 0
assert len(store.all()) == 1
def test_all_empty(store: NodeStore) -> None:
assert store.all() == []
def test_db_persists_across_instances(tmp_path: Path) -> None:
"""Data written by one NodeStore instance is visible to a new one on the same file."""
db = tmp_path / "shared.db"
s1 = NodeStore(db_path=db)
s1.upsert("navi", "http://10.1.10.10:7701")
s1.close()
s2 = NodeStore(db_path=db)
rows = s2.all()
assert len(rows) == 1
assert rows[0][0] == "navi"
s2.close()

View file

@ -0,0 +1,176 @@
# tests/test_resources/test_ollama_adopt.py
"""
Tests for the Ollama adopt-if-running path:
- ProcessSpec: adopt and health_path fields parsed from YAML
- ServiceManager.start(): adopt path claims running service; falls through if not running
- ServiceManager.is_running(): adopt path uses health probe, not proc table
- ServiceInstance.health_path persists through upsert_instance
- Probe loop uses inst.health_path instead of hardcoded /health
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.resources.agent.service_manager import ServiceManager
from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry
from circuitforge_core.resources.profiles.schema import GpuProfile, ProcessSpec, ServiceProfile, load_profile
# ── ProcessSpec schema ────────────────────────────────────────────────────────
def test_process_spec_defaults():
spec = ProcessSpec(exec_path="/usr/local/bin/ollama")
assert spec.adopt is False
assert spec.health_path == "/health"
def test_process_spec_adopt_fields():
spec = ProcessSpec(
exec_path="/usr/local/bin/ollama",
adopt=True,
health_path="/api/tags",
port=11434,
host_port=11434,
)
assert spec.adopt is True
assert spec.health_path == "/api/tags"
def test_profile_yaml_parses_adopt(tmp_path: Path):
yaml_text = """\
schema_version: 1
name: test
services:
ollama:
max_mb: 4096
priority: 1
managed:
type: process
adopt: true
exec_path: /usr/local/bin/ollama
args_template: serve
port: 11434
host_port: 11434
health_path: /api/tags
"""
p = tmp_path / "profile.yaml"
p.write_text(yaml_text)
profile = load_profile(p)
spec = profile.services["ollama"].managed
assert isinstance(spec, ProcessSpec)
assert spec.adopt is True
assert spec.health_path == "/api/tags"
assert spec.host_port == 11434
# ── ServiceManager adopt path ─────────────────────────────────────────────────
def _make_manager_with_ollama(advertise_host: str = "127.0.0.1") -> ServiceManager:
profile = GpuProfile(
schema_version=1,
name="test",
services={
"ollama": ServiceProfile(
max_mb=4096,
priority=1,
managed=ProcessSpec(
exec_path="/usr/local/bin/ollama",
args_template="serve",
port=11434,
host_port=11434,
adopt=True,
health_path="/api/tags",
),
)
},
)
return ServiceManager(node_id="heimdall", profile=profile, advertise_host=advertise_host)
def test_start_adopt_claims_running_service():
"""When Ollama is already healthy, start() returns its URL without spawning a process."""
mgr = _make_manager_with_ollama()
with patch.object(mgr, "_probe_health", return_value=True) as mock_probe:
url = mgr.start("ollama", gpu_id=0, params={})
assert url == "http://127.0.0.1:11434"
mock_probe.assert_called_once_with(11434, "/api/tags")
assert "ollama" not in mgr._procs # no subprocess spawned
def test_start_adopt_spawns_when_not_running():
"""When Ollama is not yet running, start() spawns it normally."""
mgr = _make_manager_with_ollama()
mock_proc = MagicMock()
mock_proc.poll.return_value = None
with patch.object(mgr, "_probe_health", return_value=False), \
patch("subprocess.Popen", return_value=mock_proc) as mock_popen:
url = mgr.start("ollama", gpu_id=0, params={})
assert url == "http://127.0.0.1:11434"
mock_popen.assert_called_once()
assert "ollama" in mgr._procs
def test_is_running_adopt_uses_health_probe():
"""is_running() for adopt=True services checks the health endpoint, not the proc table."""
mgr = _make_manager_with_ollama()
with patch.object(mgr, "_probe_health", return_value=True):
assert mgr.is_running("ollama") is True
with patch.object(mgr, "_probe_health", return_value=False):
assert mgr.is_running("ollama") is False
def test_probe_health_returns_true_on_200():
mgr = _make_manager_with_ollama()
mock_resp = MagicMock()
mock_resp.status = 200
mock_resp.__enter__ = lambda s: mock_resp
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("urllib.request.urlopen", return_value=mock_resp):
assert mgr._probe_health(11434, "/api/tags") is True
def test_probe_health_returns_false_on_connection_error():
mgr = _make_manager_with_ollama()
with patch("urllib.request.urlopen", side_effect=OSError("refused")):
assert mgr._probe_health(11434, "/api/tags") is False
# ── ServiceRegistry health_path ───────────────────────────────────────────────
def test_upsert_instance_stores_health_path():
reg = ServiceRegistry()
inst = reg.upsert_instance(
service="ollama", node_id="heimdall", gpu_id=0,
state="running", model=None, url="http://127.0.0.1:11434",
health_path="/api/tags",
)
assert inst.health_path == "/api/tags"
def test_upsert_instance_default_health_path():
reg = ServiceRegistry()
inst = reg.upsert_instance(
service="vllm", node_id="heimdall", gpu_id=0,
state="starting", model="qwen", url="http://127.0.0.1:8000",
)
assert inst.health_path == "/health"
def test_all_gpu_profiles_have_ollama_managed_block():
"""Sanity check: all public GPU profiles now have a managed block for ollama."""
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
registry = ProfileRegistry()
for profile in registry.list_public():
svc = profile.services.get("ollama")
if svc is None:
continue # profile may not define ollama
assert svc.managed is not None, f"{profile.name}: ollama missing managed block"
assert isinstance(svc.managed, ProcessSpec)
assert svc.managed.adopt is True, f"{profile.name}: ollama adopt should be True"
assert svc.managed.health_path == "/api/tags", f"{profile.name}: wrong health_path"