feat(orch): agent self-registration and coordinator heartbeat loop

coordinator/app.py:
- Add POST /api/nodes — agents POST {node_id, agent_url} to self-register;
  coordinator immediately polls the new agent for GPU info
- Add lifespan context manager that starts/stops AgentSupervisor heartbeat
  loop (previously the loop was never started)

cli.py start:
- Add --node-id flag (default 'local')
- Pre-register the local agent URL (http://127.0.0.1:{agent_port}) so the
  heartbeat loop can poll it immediately on startup
- Drop redundant lease_manager.register_gpu() call — supervisor.poll_agent()
  now does this via the heartbeat after the agent responds

cli.py agent:
- Add --advertise-host flag for NATted/multi-homed nodes
- Fire registration POST to coordinator in a daemon thread (2s delay) so
  uvicorn.run() can start binding immediately; no double uvicorn.run()
This commit is contained in:
pyr0ball 2026-03-31 19:20:35 -07:00
parent 4596aad290
commit 67701f0d29
2 changed files with 73 additions and 7 deletions

View file

@ -32,9 +32,14 @@ def start(
profile: Annotated[Optional[Path], typer.Option(help="Profile YAML path")] = None,
host: str = "0.0.0.0",
port: int = 7700,
node_id: str = "local",
agent_port: int = 7701,
) -> None:
"""Start the cf-orch coordinator (auto-detects GPU profile if not specified)."""
"""Start the cf-orch coordinator (auto-detects GPU profile if not specified).
Automatically pre-registers the local agent so its GPUs appear on the
dashboard immediately. Remote nodes self-register via POST /api/nodes.
"""
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
@ -52,8 +57,6 @@ def start(
"Warning: no GPUs detected via nvidia-smi — coordinator running with 0 VRAM"
)
else:
for gpu in gpus:
lease_manager.register_gpu("local", gpu.gpu_id, gpu.vram_total_mb)
typer.echo(f"Detected {len(gpus)} GPU(s)")
if profile:
@ -67,6 +70,11 @@ def start(
)
typer.echo(f"Auto-selected profile: {active_profile.name}")
# Pre-register the local agent — the heartbeat loop will poll it for live GPU data.
local_agent_url = f"http://127.0.0.1:{agent_port}"
supervisor.register(node_id, local_agent_url)
typer.echo(f"Registered local node '{node_id}'{local_agent_url}")
coordinator_app = create_coordinator_app(
lease_manager=lease_manager,
profile_registry=profile_registry,
@ -83,10 +91,47 @@ def agent(
node_id: str = "local",
host: str = "0.0.0.0",
port: int = 7701,
advertise_host: Optional[str] = None,
) -> None:
"""Start a cf-orch node agent (for remote nodes like Navi, Huginn)."""
"""Start a cf-orch node agent and self-register with the coordinator.
The agent starts its HTTP server, then POSTs its URL to the coordinator
so it appears on the dashboard without manual configuration.
Use --advertise-host to override the IP the coordinator should use to
reach this agent (e.g. on a multi-homed or NATted host).
"""
import asyncio
import threading
import httpx
from circuitforge_core.resources.agent.app import create_agent_app
# The URL the coordinator should use to reach this agent.
reach_host = advertise_host or ("127.0.0.1" if host in ("0.0.0.0", "::") else host)
agent_url = f"http://{reach_host}:{port}"
def _register_in_background() -> None:
"""POST registration to coordinator after a short delay (uvicorn needs ~1s to bind)."""
import time
time.sleep(2.0)
try:
resp = httpx.post(
f"{coordinator}/api/nodes",
json={"node_id": node_id, "agent_url": agent_url},
timeout=5.0,
)
if resp.is_success:
typer.echo(f"Registered with coordinator at {coordinator} as '{node_id}'")
else:
typer.echo(
f"Warning: coordinator registration returned {resp.status_code}", err=True
)
except Exception as exc:
typer.echo(f"Warning: could not reach coordinator at {coordinator}: {exc}", err=True)
# Fire registration in a daemon thread so uvicorn.run() can start blocking immediately.
threading.Thread(target=_register_in_background, daemon=True).start()
agent_app = create_agent_app(node_id=node_id)
typer.echo(f"Starting cf-orch agent [{node_id}] on {host}:{port}")
uvicorn.run(agent_app, host=host, port=port)

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
@ -7,13 +8,13 @@ from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
from circuitforge_core.resources.coordinator.agent_supervisor import AgentSupervisor
from circuitforge_core.resources.coordinator.eviction_engine import EvictionEngine
from circuitforge_core.resources.coordinator.lease_manager import LeaseManager
from circuitforge_core.resources.coordinator.profile_registry import ProfileRegistry
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
class LeaseRequest(BaseModel):
node_id: str
@ -24,6 +25,11 @@ class LeaseRequest(BaseModel):
ttl_s: float = 0.0
class NodeRegisterRequest(BaseModel):
node_id: str
agent_url: str # e.g. "http://10.1.10.71:7701"
def create_coordinator_app(
lease_manager: LeaseManager,
profile_registry: ProfileRegistry,
@ -31,7 +37,15 @@ def create_coordinator_app(
) -> FastAPI:
eviction_engine = EvictionEngine(lease_manager=lease_manager)
app = FastAPI(title="cf-orch-coordinator")
@asynccontextmanager
async def _lifespan(app: FastAPI): # type: ignore[type-arg]
import asyncio
task = asyncio.create_task(agent_supervisor.run_heartbeat_loop())
yield
agent_supervisor.stop()
task.cancel()
app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan)
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
def dashboard() -> HTMLResponse:
@ -65,6 +79,13 @@ def create_coordinator_app(
]
}
@app.post("/api/nodes")
async def register_node(req: NodeRegisterRequest) -> dict[str, Any]:
"""Agents call this to self-register. Coordinator immediately polls for GPU info."""
agent_supervisor.register(req.node_id, req.agent_url)
await agent_supervisor.poll_agent(req.node_id)
return {"registered": True, "node_id": req.node_id}
@app.get("/api/profiles")
def get_profiles() -> dict[str, Any]:
return {