feat(orch): background health probe loop — starting → running transition

Coordinator now polls all 'starting' instances every 5 s via GET /health.
On 200: state → running. After 300 s without a healthy response: state →
stopped. Closes #10.
This commit is contained in:
pyr0ball 2026-04-02 17:18:16 -07:00
parent bd132851ec
commit a7290c1240

View file

@ -1,9 +1,14 @@
from __future__ import annotations from __future__ import annotations
import logging
import time
import urllib.request
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
logger = logging.getLogger(__name__)
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from pydantic import BaseModel from pydantic import BaseModel
@ -17,6 +22,54 @@ from circuitforge_core.resources.coordinator.service_registry import ServiceRegi
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
_PROBE_INTERVAL_S = 5.0 # how often to poll starting instances
_PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds
async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None:
"""
Background loop: transition 'starting' instances to 'running' once their
/health endpoint responds, or to 'stopped' after PROBE_TIMEOUT_S.
"""
import asyncio
start_times: dict[str, float] = {} # instance key → time first seen as starting
while True:
await asyncio.sleep(_PROBE_INTERVAL_S)
now = time.time()
for inst in service_registry.all_instances():
if inst.state != "starting":
start_times.pop(f"{inst.service}:{inst.node_id}:{inst.gpu_id}", None)
continue
key = f"{inst.service}:{inst.node_id}:{inst.gpu_id}"
start_times.setdefault(key, now)
healthy = False
if inst.url:
try:
with urllib.request.urlopen(
inst.url.rstrip("/") + "/health", timeout=2.0
) as resp:
healthy = resp.status == 200
except Exception:
pass
if healthy:
service_registry.upsert_instance(
service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id,
state="running", model=inst.model, url=inst.url,
)
start_times.pop(key, None)
logger.info("Instance %s/%s gpu=%s transitioned to running", inst.service, inst.node_id, inst.gpu_id)
elif now - start_times[key] > _PROBE_TIMEOUT_S:
service_registry.upsert_instance(
service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id,
state="stopped", model=inst.model, url=inst.url,
)
start_times.pop(key, None)
logger.warning("Instance %s/%s gpu=%s timed out in starting state — marked stopped", inst.service, inst.node_id, inst.gpu_id)
class LeaseRequest(BaseModel): class LeaseRequest(BaseModel):
node_id: str node_id: str
@ -61,10 +114,12 @@ def create_coordinator_app(
@asynccontextmanager @asynccontextmanager
async def _lifespan(app: FastAPI): # type: ignore[type-arg] async def _lifespan(app: FastAPI): # type: ignore[type-arg]
import asyncio import asyncio
task = asyncio.create_task(agent_supervisor.run_heartbeat_loop()) heartbeat_task = asyncio.create_task(agent_supervisor.run_heartbeat_loop())
probe_task = asyncio.create_task(_run_instance_probe_loop(service_registry))
yield yield
agent_supervisor.stop() agent_supervisor.stop()
task.cancel() heartbeat_task.cancel()
probe_task.cancel()
app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan) app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan)