From a54a5304933254c0b7ed9276e0327c9b7dea4ed0 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 22:01:55 -0700 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20agent=20watchdog=20=E2=80=94=20pers?= =?UTF-8?q?ist=20known=20nodes=20+=20auto-reconnect=20after=20coordinator?= =?UTF-8?q?=20restart?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit closes #15 - NodeStore: SQLite persistence for known agent nodes (~/.local/share/circuitforge/cf-orch-nodes.db) - upsert on every register(); prune_stale() for 30-day cleanup - survives coordinator restarts — data readable by next process - AgentSupervisor.restore_from_store(): reload known nodes on startup, mark all offline; heartbeat loop brings back any that respond - AgentSupervisor.register(): persists to NodeStore on every call - cli.py coordinator: NodeStore wired in; restore_from_store() called before uvicorn starts - cli.py agent: one-shot registration replaced with persistent reconnect loop (daemon thread, 30 s interval) — coordinator restart → nodes reappear within one cycle with no manual intervention on agent hosts - 16 new tests: NodeStore (8) + AgentSupervisor watchdog (8) --- circuitforge_core/resources/cli.py | 66 +++++--- .../resources/coordinator/agent_supervisor.py | 27 ++++ .../resources/coordinator/node_store.py | 85 ++++++++++ tests/test_resources/test_agent_watchdog.py | 151 ++++++++++++++++++ tests/test_resources/test_node_store.py | 87 ++++++++++ 5 files changed, 396 insertions(+), 20 deletions(-) create mode 100644 circuitforge_core/resources/coordinator/node_store.py create mode 100644 tests/test_resources/test_agent_watchdog.py create mode 100644 tests/test_resources/test_node_store.py diff --git a/circuitforge_core/resources/cli.py b/circuitforge_core/resources/cli.py index dc6883f..7238507 100644 --- a/circuitforge_core/resources/cli.py +++ b/circuitforge_core/resources/cli.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging import sys from pathlib import Path from typing import Annotated, Optional @@ -7,6 +8,8 @@ from typing import Annotated, Optional import typer import uvicorn +logger = logging.getLogger(__name__) + app = typer.Typer(name="cf-orch", help="CircuitForge GPU resource orchestrator") _SYSTEMD_UNIT_PATH = Path("/etc/systemd/system/cf-orch.service") @@ -47,14 +50,21 @@ def start( from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.agent.gpu_monitor import GpuMonitor + from circuitforge_core.resources.coordinator.node_store import NodeStore + lease_manager = LeaseManager() profile_registry = ProfileRegistry() service_registry = ServiceRegistry() + node_store = NodeStore() supervisor = AgentSupervisor( lease_manager=lease_manager, service_registry=service_registry, profile_registry=profile_registry, + node_store=node_store, ) + restored = supervisor.restore_from_store() + if restored: + typer.echo(f"Restored {restored} known node(s) from previous session") monitor = GpuMonitor() gpus = monitor.poll() @@ -119,27 +129,43 @@ def agent( reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host) agent_url = f"http://{reach_host}:{port}" - def _register_in_background() -> None: - """POST registration to coordinator after a short delay (uvicorn needs ~1s to bind).""" - import time - time.sleep(2.0) - try: - resp = httpx.post( - f"{coordinator}/api/nodes", - json={"node_id": node_id, "agent_url": agent_url}, - timeout=5.0, - ) - if resp.is_success: - typer.echo(f"Registered with coordinator at {coordinator} as '{node_id}'") - else: - typer.echo( - f"Warning: coordinator registration returned {resp.status_code}", err=True - ) - except Exception as exc: - typer.echo(f"Warning: could not reach coordinator at {coordinator}: {exc}", err=True) + _RECONNECT_INTERVAL_S = 30.0 - # Fire registration in a daemon thread so uvicorn.run() can start blocking immediately. - threading.Thread(target=_register_in_background, daemon=True).start() + def _reconnect_loop() -> None: + """ + Persistently re-register this agent with the coordinator. + + Runs as a daemon thread for the lifetime of the agent process: + - Waits 2 s on first run (uvicorn needs time to bind) + - Re-registers every 30 s thereafter + - If the coordinator is down, silently retries — no crashing + - When the coordinator restarts, the agent re-appears within one cycle + + This means coordinator restarts require no manual intervention on agent hosts. + """ + import time + first = True + while True: + time.sleep(2.0 if first else _RECONNECT_INTERVAL_S) + first = False + try: + resp = httpx.post( + f"{coordinator}/api/nodes", + json={"node_id": node_id, "agent_url": agent_url}, + timeout=5.0, + ) + if resp.is_success: + logger.debug("Registered with coordinator at %s as '%s'", coordinator, node_id) + else: + logger.warning( + "Coordinator registration returned %s", resp.status_code + ) + except Exception as exc: + logger.debug("Coordinator at %s unreachable, will retry: %s", coordinator, exc) + + # Fire reconnect loop in a daemon thread so uvicorn.run() can start blocking immediately. + threading.Thread(target=_reconnect_loop, daemon=True, name="cf-orch-reconnect").start() + typer.echo(f"Reconnect loop started — will register with {coordinator} every {int(_RECONNECT_INTERVAL_S)}s") service_manager = None try: diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index 8536636..503c8c5 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -8,6 +8,7 @@ from dataclasses import dataclass, field import httpx from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.node_store import NodeStore from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry from circuitforge_core.resources.models import GpuInfo, NodeInfo, ResidentAllocation @@ -33,14 +34,38 @@ class AgentSupervisor: lease_manager: LeaseManager, service_registry: ServiceRegistry | None = None, profile_registry: ProfileRegistry | None = None, + node_store: NodeStore | None = None, ) -> None: self._agents: dict[str, AgentRecord] = {} self._lease_manager = lease_manager self._running = False self._service_registry = service_registry self._profile_registry = profile_registry + self._node_store = node_store self._heartbeat_tick = 0 + def restore_from_store(self) -> int: + """ + Load previously-known nodes from NodeStore into the in-memory registry. + + All restored nodes start as offline=False. The heartbeat loop will poll + them on its first tick and promote any that respond to online=True. + + Returns the number of nodes restored. + """ + if self._node_store is None: + return 0 + restored = 0 + for node_id, agent_url in self._node_store.all(): + if node_id not in self._agents: + self._agents[node_id] = AgentRecord( + node_id=node_id, agent_url=agent_url, online=False + ) + restored += 1 + if restored: + logger.info("NodeStore: restored %d known node(s) from previous session", restored) + return restored + def register(self, node_id: str, agent_url: str) -> None: if node_id not in self._agents: self._agents[node_id] = AgentRecord(node_id=node_id, agent_url=agent_url) @@ -49,6 +74,8 @@ class AgentSupervisor: if self._agents[node_id].agent_url != agent_url: self._agents[node_id].agent_url = agent_url logger.info("Updated agent URL for %s → %s", node_id, agent_url) + if self._node_store is not None: + self._node_store.upsert(node_id, agent_url) def get_node_info(self, node_id: str) -> NodeInfo | None: record = self._agents.get(node_id) diff --git a/circuitforge_core/resources/coordinator/node_store.py b/circuitforge_core/resources/coordinator/node_store.py new file mode 100644 index 0000000..8dc71f9 --- /dev/null +++ b/circuitforge_core/resources/coordinator/node_store.py @@ -0,0 +1,85 @@ +""" +circuitforge_core.resources.coordinator.node_store — SQLite persistence for known agent nodes. + +Gives the coordinator restart-safe memory of which nodes have ever registered. +On startup the coordinator reloads all known nodes and immediately probes them; +nodes that respond come back online within one heartbeat cycle (~10 s) without +any manual intervention on the agent hosts. +""" +from __future__ import annotations + +import logging +import sqlite3 +import time +from pathlib import Path + +logger = logging.getLogger(__name__) + +_DEFAULT_DB_PATH = Path.home() / ".local" / "share" / "circuitforge" / "cf-orch-nodes.db" +_STALE_AGE_DAYS = 30 # nodes unseen for this long are pruned automatically + + +class NodeStore: + """ + Thin SQLite wrapper for persisting known agent nodes across coordinator restarts. + + Thread-safe for single-writer use (coordinator runs in one asyncio thread). + """ + + def __init__(self, db_path: Path = _DEFAULT_DB_PATH) -> None: + self.db_path = db_path + db_path.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(str(db_path), check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._migrate() + logger.debug("NodeStore initialised at %s", db_path) + + def _migrate(self) -> None: + self._conn.executescript(""" + CREATE TABLE IF NOT EXISTS known_nodes ( + node_id TEXT PRIMARY KEY, + agent_url TEXT NOT NULL, + last_seen REAL NOT NULL + ); + """) + self._conn.commit() + + def upsert(self, node_id: str, agent_url: str) -> None: + """Record or update a node. Called on every successful registration.""" + self._conn.execute( + """ + INSERT INTO known_nodes (node_id, agent_url, last_seen) + VALUES (?, ?, ?) + ON CONFLICT(node_id) DO UPDATE SET + agent_url = excluded.agent_url, + last_seen = excluded.last_seen + """, + (node_id, agent_url, time.time()), + ) + self._conn.commit() + + def all(self) -> list[tuple[str, str]]: + """Return all known (node_id, agent_url) pairs.""" + rows = self._conn.execute( + "SELECT node_id, agent_url FROM known_nodes ORDER BY last_seen DESC" + ).fetchall() + return [(r["node_id"], r["agent_url"]) for r in rows] + + def remove(self, node_id: str) -> None: + self._conn.execute("DELETE FROM known_nodes WHERE node_id = ?", (node_id,)) + self._conn.commit() + + def prune_stale(self, max_age_days: int = _STALE_AGE_DAYS) -> int: + """Delete nodes not seen within max_age_days. Returns count removed.""" + cutoff = time.time() - max_age_days * 86400 + cur = self._conn.execute( + "DELETE FROM known_nodes WHERE last_seen < ?", (cutoff,) + ) + self._conn.commit() + removed = cur.rowcount + if removed: + logger.info("NodeStore: pruned %d stale node(s) (>%d days old)", removed, max_age_days) + return removed + + def close(self) -> None: + self._conn.close() diff --git a/tests/test_resources/test_agent_watchdog.py b/tests/test_resources/test_agent_watchdog.py new file mode 100644 index 0000000..78b6d09 --- /dev/null +++ b/tests/test_resources/test_agent_watchdog.py @@ -0,0 +1,151 @@ +# tests/test_resources/test_agent_watchdog.py +""" +Tests for AgentSupervisor watchdog behaviour: + - restore_from_store() reloads known nodes from NodeStore on startup + - register() persists to NodeStore + - restored nodes start offline and come online after a successful poll + - NodeStore=None path is a no-op (backwards compatibility) +""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor +from circuitforge_core.resources.coordinator.lease_manager import LeaseManager +from circuitforge_core.resources.coordinator.node_store import NodeStore + + +# ── fixtures ────────────────────────────────────────────────────────────────── + +@pytest.fixture +def store(tmp_path: Path) -> NodeStore: + return NodeStore(db_path=tmp_path / "nodes.db") + + +@pytest.fixture +def supervisor(store: NodeStore) -> AgentSupervisor: + return AgentSupervisor(lease_manager=LeaseManager(), node_store=store) + + +@pytest.fixture +def supervisor_no_store() -> AgentSupervisor: + return AgentSupervisor(lease_manager=LeaseManager(), node_store=None) + + +# ── register() persists ─────────────────────────────────────────────────────── + +def test_register_persists_to_store(supervisor: AgentSupervisor, store: NodeStore) -> None: + supervisor.register("heimdall", "http://127.0.0.1:7701") + rows = store.all() + assert len(rows) == 1 + assert rows[0] == ("heimdall", "http://127.0.0.1:7701") + + +def test_register_updates_url_in_store(supervisor: AgentSupervisor, store: NodeStore) -> None: + supervisor.register("navi", "http://10.1.10.10:7701") + supervisor.register("navi", "http://10.1.10.10:9999") + rows = store.all() + assert len(rows) == 1 + assert rows[0][1] == "http://10.1.10.10:9999" + + +def test_register_without_store_does_not_crash(supervisor_no_store: AgentSupervisor) -> None: + supervisor_no_store.register("heimdall", "http://127.0.0.1:7701") + assert supervisor_no_store.get_node_info("heimdall") is not None + + +# ── restore_from_store() ────────────────────────────────────────────────────── + +def test_restore_loads_known_nodes(tmp_path: Path) -> None: + """Nodes written by a previous supervisor session are restored into a fresh one.""" + db = tmp_path / "nodes.db" + + # Session 1: register two nodes + s1 = NodeStore(db_path=db) + sup1 = AgentSupervisor(lease_manager=LeaseManager(), node_store=s1) + sup1.register("navi", "http://10.1.10.10:7701") + sup1.register("strahl", "http://10.1.10.20:7701") + + # Session 2: fresh supervisor, same DB + s2 = NodeStore(db_path=db) + sup2 = AgentSupervisor(lease_manager=LeaseManager(), node_store=s2) + restored = sup2.restore_from_store() + + assert restored == 2 + assert sup2.get_node_info("navi") is not None + assert sup2.get_node_info("strahl") is not None + + +def test_restore_marks_nodes_offline(tmp_path: Path) -> None: + """Restored nodes start offline — they haven't been polled yet.""" + db = tmp_path / "nodes.db" + + s1 = NodeStore(db_path=db) + AgentSupervisor(lease_manager=LeaseManager(), node_store=s1).register( + "navi", "http://10.1.10.10:7701" + ) + + s2 = NodeStore(db_path=db) + sup2 = AgentSupervisor(lease_manager=LeaseManager(), node_store=s2) + sup2.restore_from_store() + + assert sup2.online_agents() == {} + + +def test_restore_returns_zero_without_store() -> None: + sup = AgentSupervisor(lease_manager=LeaseManager(), node_store=None) + assert sup.restore_from_store() == 0 + + +def test_restore_skips_already_registered(tmp_path: Path) -> None: + """Nodes manually registered before restore_from_store() are not duplicated.""" + db = tmp_path / "nodes.db" + store = NodeStore(db_path=db) + store.upsert("heimdall", "http://127.0.0.1:7701") + + sup = AgentSupervisor(lease_manager=LeaseManager(), node_store=store) + sup.register("heimdall", "http://127.0.0.1:7701") # already in memory + restored = sup.restore_from_store() + + assert restored == 0 # already present, not double-counted + + +# ── restored node comes online after poll ───────────────────────────────────── + +@pytest.mark.asyncio +async def test_restored_node_comes_online_after_poll(tmp_path: Path) -> None: + """After restore, a successful poll_agent() brings the node online.""" + db = tmp_path / "nodes.db" + store = NodeStore(db_path=db) + store.upsert("navi", "http://10.1.10.10:7701") + + sup = AgentSupervisor(lease_manager=LeaseManager(), node_store=store) + sup.restore_from_store() + + # Stub poll_agent to succeed + gpu_payload = {"gpus": [{"gpu_id": 0, "name": "RTX 4000", + "vram_total_mb": 8192, "vram_used_mb": 512, "vram_free_mb": 7680}]} + resident_payload = {"residents": []} + + mock_resp_gpu = MagicMock() + mock_resp_gpu.raise_for_status = MagicMock() + mock_resp_gpu.json.return_value = gpu_payload + + mock_resp_res = MagicMock() + mock_resp_res.is_success = True + mock_resp_res.json.return_value = resident_payload + + mock_client = AsyncMock() + mock_client.get = AsyncMock(side_effect=[mock_resp_gpu, mock_resp_res]) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("circuitforge_core.resources.coordinator.agent_supervisor.httpx.AsyncClient", + return_value=mock_client): + result = await sup.poll_agent("navi") + + assert result is True + assert "navi" in sup.online_agents() diff --git a/tests/test_resources/test_node_store.py b/tests/test_resources/test_node_store.py new file mode 100644 index 0000000..91b6e0c --- /dev/null +++ b/tests/test_resources/test_node_store.py @@ -0,0 +1,87 @@ +# tests/test_resources/test_node_store.py +"""Unit tests for NodeStore — SQLite persistence layer for known agent nodes.""" +from __future__ import annotations + +import time +from pathlib import Path + +import pytest + +from circuitforge_core.resources.coordinator.node_store import NodeStore + + +@pytest.fixture +def store(tmp_path: Path) -> NodeStore: + return NodeStore(db_path=tmp_path / "test-nodes.db") + + +def test_upsert_and_all(store: NodeStore) -> None: + store.upsert("heimdall", "http://127.0.0.1:7701") + rows = store.all() + assert len(rows) == 1 + assert rows[0] == ("heimdall", "http://127.0.0.1:7701") + + +def test_upsert_updates_url(store: NodeStore) -> None: + store.upsert("navi", "http://10.1.10.10:7701") + store.upsert("navi", "http://10.1.10.10:7702") + rows = store.all() + assert len(rows) == 1 + assert rows[0][1] == "http://10.1.10.10:7702" + + +def test_multiple_nodes(store: NodeStore) -> None: + store.upsert("heimdall", "http://127.0.0.1:7701") + store.upsert("navi", "http://10.1.10.10:7701") + store.upsert("strahl", "http://10.1.10.20:7701") + assert len(store.all()) == 3 + + +def test_remove(store: NodeStore) -> None: + store.upsert("heimdall", "http://127.0.0.1:7701") + store.upsert("navi", "http://10.1.10.10:7701") + store.remove("navi") + ids = [r[0] for r in store.all()] + assert "navi" not in ids + assert "heimdall" in ids + + +def test_prune_stale_removes_old_entries(store: NodeStore) -> None: + # Insert a node with a last_seen in the distant past + store._conn.execute( + "INSERT INTO known_nodes (node_id, agent_url, last_seen) VALUES (?, ?, ?)", + ("ghost", "http://dead:7701", time.time() - 40 * 86400), + ) + store._conn.commit() + store.upsert("live", "http://live:7701") + + removed = store.prune_stale(max_age_days=30) + assert removed == 1 + ids = [r[0] for r in store.all()] + assert "ghost" not in ids + assert "live" in ids + + +def test_prune_stale_keeps_recent(store: NodeStore) -> None: + store.upsert("recent", "http://recent:7701") + removed = store.prune_stale(max_age_days=30) + assert removed == 0 + assert len(store.all()) == 1 + + +def test_all_empty(store: NodeStore) -> None: + assert store.all() == [] + + +def test_db_persists_across_instances(tmp_path: Path) -> None: + """Data written by one NodeStore instance is visible to a new one on the same file.""" + db = tmp_path / "shared.db" + s1 = NodeStore(db_path=db) + s1.upsert("navi", "http://10.1.10.10:7701") + s1.close() + + s2 = NodeStore(db_path=db) + rows = s2.all() + assert len(rows) == 1 + assert rows[0][0] == "navi" + s2.close() From 7bb6b76bd5afc0080482b18674c44972b577dc61 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 22:09:42 -0700 Subject: [PATCH 2/2] feat: ollama adopt-if-running + health_path in ProcessSpec (#16) - ProcessSpec: adopt (bool) and health_path (str, default /health) fields - ServiceManager: adopt=True probes health_path before spawning; is_running() uses health probe for adopt services rather than proc table + socket check - _probe_health() helper: urllib GET on localhost:port+path, returns bool - Agent /services/{service}/start: returns adopted=True when service was already running; coordinator sets state=running immediately (no probe wait) - ServiceInstance: health_path field (default /health) - service_registry.upsert_instance(): health_path kwarg - Probe loop uses inst.health_path instead of hardcoded /health - coordinator allocate_service: looks up health_path from profile spec via _get_health_path() and stores on ServiceInstance - All GPU profiles (2/4/6/8/16/24 GB + cpu-16/32): ollama managed block with adopt=true, health_path=/api/tags, port 11434 - 11 new tests --- circuitforge_core/resources/agent/app.py | 6 +- .../resources/agent/service_manager.py | 19 +- .../resources/coordinator/app.py | 20 +- .../resources/coordinator/service_registry.py | 3 + .../resources/profiles/public/cpu-16gb.yaml | 8 + .../resources/profiles/public/cpu-32gb.yaml | 8 + .../profiles/public/single-gpu-16gb.yaml | 8 + .../profiles/public/single-gpu-24gb.yaml | 8 + .../profiles/public/single-gpu-2gb.yaml | 8 + .../profiles/public/single-gpu-4gb.yaml | 8 + .../profiles/public/single-gpu-6gb.yaml | 8 + .../profiles/public/single-gpu-8gb.yaml | 8 + .../resources/profiles/schema.py | 5 + tests/test_resources/test_ollama_adopt.py | 176 ++++++++++++++++++ 14 files changed, 288 insertions(+), 5 deletions(-) create mode 100644 tests/test_resources/test_ollama_adopt.py diff --git a/circuitforge_core/resources/agent/app.py b/circuitforge_core/resources/agent/app.py index 0045041..162e7c5 100644 --- a/circuitforge_core/resources/agent/app.py +++ b/circuitforge_core/resources/agent/app.py @@ -86,8 +86,12 @@ def create_agent_app( @app.post("/services/{service}/start") def start_service(service: str, req: ServiceStartRequest) -> dict: try: + already_running = service_manager.is_running(service) url = service_manager.start(service, req.gpu_id, req.params) - return {"service": service, "url": url, "running": True} + # adopted=True signals the coordinator to treat this instance as + # immediately running rather than waiting for the probe loop. + adopted = already_running and service_manager.is_running(service) + return {"service": service, "url": url, "running": True, "adopted": adopted} except (ValueError, NotImplementedError) as exc: raise HTTPException(status_code=422, detail=str(exc)) except Exception as exc: diff --git a/circuitforge_core/resources/agent/service_manager.py b/circuitforge_core/resources/agent/service_manager.py index 336330b..5578c24 100644 --- a/circuitforge_core/resources/agent/service_manager.py +++ b/circuitforge_core/resources/agent/service_manager.py @@ -67,6 +67,10 @@ class ServiceManager: except subprocess.CalledProcessError: return False if isinstance(spec, ProcessSpec): + # For adopt=True services, check the health endpoint regardless of whether + # we spawned the process (it may be a system daemon we didn't start). + if spec.adopt: + return self._probe_health(spec.host_port, spec.health_path) proc = self._procs.get(service) if proc is None or proc.poll() is not None: return False @@ -78,6 +82,16 @@ class ServiceManager: return False return False + def _probe_health(self, port: int, health_path: str = "/health") -> bool: + """Return True if the service at localhost:port responds 200 on health_path.""" + import urllib.request + try: + url = f"http://127.0.0.1:{port}{health_path}" + with urllib.request.urlopen(url, timeout=2.0) as resp: + return resp.status == 200 + except Exception: + return False + def start(self, service: str, gpu_id: int, params: dict[str, str]) -> str: spec = self._get_spec(service) if spec is None: @@ -111,7 +125,10 @@ class ServiceManager: return f"http://{self.advertise_host}:{spec.host_port}" if isinstance(spec, ProcessSpec): - import shlex + # adopt=True: if the service is already healthy, claim it without spawning. + if spec.adopt and self._probe_health(spec.host_port, spec.health_path): + return f"http://{self.advertise_host}:{spec.host_port}" + import subprocess as _sp filler = defaultdict(str, params) diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index f55cbb5..1051d7b 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -19,9 +19,19 @@ from circuitforge_core.resources.coordinator.lease_manager import LeaseManager from circuitforge_core.resources.coordinator.node_selector import select_node from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry +from circuitforge_core.resources.profiles.schema import ProcessSpec _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() + +def _get_health_path(profile_registry: ProfileRegistry, service: str) -> str: + """Return the health_path for a service from the first matching profile spec.""" + for profile in profile_registry.list_public(): + svc = profile.services.get(service) + if svc and isinstance(svc.managed, ProcessSpec): + return svc.managed.health_path + return "/health" + _PROBE_INTERVAL_S = 5.0 # how often to poll starting instances _PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds @@ -49,7 +59,7 @@ async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None: if inst.url: try: with urllib.request.urlopen( - inst.url.rstrip("/") + "/health", timeout=2.0 + inst.url.rstrip("/") + inst.health_path, timeout=2.0 ) as resp: healthy = resp.status == 200 except Exception: @@ -381,8 +391,11 @@ def create_coordinator_app( url=svc_url, ttl_s=req.ttl_s, ) - # Seed the instance state for first-time starts - instance_state = "running" if warm else "starting" + # Seed the instance state for first-time starts. + # adopted=True means the agent found it already running. + adopted = data.get("adopted", False) + instance_state = "running" if (warm or adopted) else "starting" + health_path = _get_health_path(profile_registry, service) service_registry.upsert_instance( service=service, node_id=node_id, @@ -390,6 +403,7 @@ def create_coordinator_app( state=instance_state, model=model, url=svc_url, + health_path=health_path, ) return { "allocation_id": alloc.allocation_id, diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py index c258ad7..18c7b20 100644 --- a/circuitforge_core/resources/coordinator/service_registry.py +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -29,6 +29,7 @@ class ServiceInstance: model: str | None url: str | None idle_since: float | None = None + health_path: str = "/health" class ServiceRegistry: @@ -107,6 +108,7 @@ class ServiceRegistry: state: Literal["starting", "running", "idle", "stopped"], model: str | None, url: str | None, + health_path: str = "/health", ) -> ServiceInstance: key = f"{service}:{node_id}:{gpu_id}" existing = self._instances.get(key) @@ -117,6 +119,7 @@ class ServiceRegistry: inst = ServiceInstance( service=service, node_id=node_id, gpu_id=gpu_id, state=state, model=model, url=url, idle_since=idle_since, + health_path=health_path, ) self._instances[key] = inst return inst diff --git a/circuitforge_core/resources/profiles/public/cpu-16gb.yaml b/circuitforge_core/resources/profiles/public/cpu-16gb.yaml index 75a22e8..4ecec14 100644 --- a/circuitforge_core/resources/profiles/public/cpu-16gb.yaml +++ b/circuitforge_core/resources/profiles/public/cpu-16gb.yaml @@ -5,6 +5,14 @@ services: ollama: max_mb: 0 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-stt: max_mb: 0 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/cpu-32gb.yaml b/circuitforge_core/resources/profiles/public/cpu-32gb.yaml index fb7c976..1ae4299 100644 --- a/circuitforge_core/resources/profiles/public/cpu-32gb.yaml +++ b/circuitforge_core/resources/profiles/public/cpu-32gb.yaml @@ -5,6 +5,14 @@ services: ollama: max_mb: 0 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-stt: max_mb: 0 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml index 457c063..70d533d 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml @@ -17,6 +17,14 @@ services: ollama: max_mb: 12288 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 3072 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml index 5e933bc..073bf0e 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml @@ -17,6 +17,14 @@ services: ollama: max_mb: 18432 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 4096 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml index d852eea..b845dbc 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-2gb.yaml @@ -6,6 +6,14 @@ services: ollama: max_mb: 1536 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 512 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml index b7cb24e..1bec3e3 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-4gb.yaml @@ -6,6 +6,14 @@ services: ollama: max_mb: 3072 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 1024 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml index 1e8d6b3..3446e54 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml @@ -17,6 +17,14 @@ services: ollama: max_mb: 3584 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 1536 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml index ae3b170..23ab8d5 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml @@ -17,6 +17,14 @@ services: ollama: max_mb: 4096 priority: 1 + managed: + type: process + adopt: true + exec_path: "/usr/local/bin/ollama" + args_template: "serve" + port: 11434 + host_port: 11434 + health_path: /api/tags cf-vision: max_mb: 2048 priority: 2 diff --git a/circuitforge_core/resources/profiles/schema.py b/circuitforge_core/resources/profiles/schema.py index 1ba8ee5..2667916 100644 --- a/circuitforge_core/resources/profiles/schema.py +++ b/circuitforge_core/resources/profiles/schema.py @@ -34,6 +34,11 @@ class ProcessSpec(BaseModel): env: dict[str, str] = Field(default_factory=dict) port: int = 0 host_port: int = 0 + # adopt=True: if the service is already listening on host_port, claim it rather + # than spawning a new process (useful for system daemons like Ollama). + adopt: bool = False + # Override the health probe path; defaults to /health (Ollama uses /api/tags). + health_path: str = "/health" model_config = {"frozen": True} diff --git a/tests/test_resources/test_ollama_adopt.py b/tests/test_resources/test_ollama_adopt.py new file mode 100644 index 0000000..ceaae12 --- /dev/null +++ b/tests/test_resources/test_ollama_adopt.py @@ -0,0 +1,176 @@ +# tests/test_resources/test_ollama_adopt.py +""" +Tests for the Ollama adopt-if-running path: + - ProcessSpec: adopt and health_path fields parsed from YAML + - ServiceManager.start(): adopt path claims running service; falls through if not running + - ServiceManager.is_running(): adopt path uses health probe, not proc table + - ServiceInstance.health_path persists through upsert_instance + - Probe loop uses inst.health_path instead of hardcoded /health +""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from circuitforge_core.resources.agent.service_manager import ServiceManager +from circuitforge_core.resources.coordinator.service_registry import ServiceRegistry +from circuitforge_core.resources.profiles.schema import GpuProfile, ProcessSpec, ServiceProfile, load_profile + + +# ── ProcessSpec schema ──────────────────────────────────────────────────────── + +def test_process_spec_defaults(): + spec = ProcessSpec(exec_path="/usr/local/bin/ollama") + assert spec.adopt is False + assert spec.health_path == "/health" + + +def test_process_spec_adopt_fields(): + spec = ProcessSpec( + exec_path="/usr/local/bin/ollama", + adopt=True, + health_path="/api/tags", + port=11434, + host_port=11434, + ) + assert spec.adopt is True + assert spec.health_path == "/api/tags" + + +def test_profile_yaml_parses_adopt(tmp_path: Path): + yaml_text = """\ +schema_version: 1 +name: test +services: + ollama: + max_mb: 4096 + priority: 1 + managed: + type: process + adopt: true + exec_path: /usr/local/bin/ollama + args_template: serve + port: 11434 + host_port: 11434 + health_path: /api/tags +""" + p = tmp_path / "profile.yaml" + p.write_text(yaml_text) + profile = load_profile(p) + spec = profile.services["ollama"].managed + assert isinstance(spec, ProcessSpec) + assert spec.adopt is True + assert spec.health_path == "/api/tags" + assert spec.host_port == 11434 + + +# ── ServiceManager adopt path ───────────────────────────────────────────────── + +def _make_manager_with_ollama(advertise_host: str = "127.0.0.1") -> ServiceManager: + profile = GpuProfile( + schema_version=1, + name="test", + services={ + "ollama": ServiceProfile( + max_mb=4096, + priority=1, + managed=ProcessSpec( + exec_path="/usr/local/bin/ollama", + args_template="serve", + port=11434, + host_port=11434, + adopt=True, + health_path="/api/tags", + ), + ) + }, + ) + return ServiceManager(node_id="heimdall", profile=profile, advertise_host=advertise_host) + + +def test_start_adopt_claims_running_service(): + """When Ollama is already healthy, start() returns its URL without spawning a process.""" + mgr = _make_manager_with_ollama() + with patch.object(mgr, "_probe_health", return_value=True) as mock_probe: + url = mgr.start("ollama", gpu_id=0, params={}) + assert url == "http://127.0.0.1:11434" + mock_probe.assert_called_once_with(11434, "/api/tags") + assert "ollama" not in mgr._procs # no subprocess spawned + + +def test_start_adopt_spawns_when_not_running(): + """When Ollama is not yet running, start() spawns it normally.""" + mgr = _make_manager_with_ollama() + mock_proc = MagicMock() + mock_proc.poll.return_value = None + + with patch.object(mgr, "_probe_health", return_value=False), \ + patch("subprocess.Popen", return_value=mock_proc) as mock_popen: + url = mgr.start("ollama", gpu_id=0, params={}) + + assert url == "http://127.0.0.1:11434" + mock_popen.assert_called_once() + assert "ollama" in mgr._procs + + +def test_is_running_adopt_uses_health_probe(): + """is_running() for adopt=True services checks the health endpoint, not the proc table.""" + mgr = _make_manager_with_ollama() + with patch.object(mgr, "_probe_health", return_value=True): + assert mgr.is_running("ollama") is True + with patch.object(mgr, "_probe_health", return_value=False): + assert mgr.is_running("ollama") is False + + +def test_probe_health_returns_true_on_200(): + mgr = _make_manager_with_ollama() + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.__enter__ = lambda s: mock_resp + mock_resp.__exit__ = MagicMock(return_value=False) + + with patch("urllib.request.urlopen", return_value=mock_resp): + assert mgr._probe_health(11434, "/api/tags") is True + + +def test_probe_health_returns_false_on_connection_error(): + mgr = _make_manager_with_ollama() + with patch("urllib.request.urlopen", side_effect=OSError("refused")): + assert mgr._probe_health(11434, "/api/tags") is False + + +# ── ServiceRegistry health_path ─────────────────────────────────────────────── + +def test_upsert_instance_stores_health_path(): + reg = ServiceRegistry() + inst = reg.upsert_instance( + service="ollama", node_id="heimdall", gpu_id=0, + state="running", model=None, url="http://127.0.0.1:11434", + health_path="/api/tags", + ) + assert inst.health_path == "/api/tags" + + +def test_upsert_instance_default_health_path(): + reg = ServiceRegistry() + inst = reg.upsert_instance( + service="vllm", node_id="heimdall", gpu_id=0, + state="starting", model="qwen", url="http://127.0.0.1:8000", + ) + assert inst.health_path == "/health" + + +def test_all_gpu_profiles_have_ollama_managed_block(): + """Sanity check: all public GPU profiles now have a managed block for ollama.""" + from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry + registry = ProfileRegistry() + for profile in registry.list_public(): + svc = profile.services.get("ollama") + if svc is None: + continue # profile may not define ollama + assert svc.managed is not None, f"{profile.name}: ollama missing managed block" + assert isinstance(svc.managed, ProcessSpec) + assert svc.managed.adopt is True, f"{profile.name}: ollama adopt should be True" + assert svc.managed.health_path == "/api/tags", f"{profile.name}: wrong health_path"