feat(orch): background health probe loop — starting → running transition

Coordinator now polls all 'starting' instances every 5 s via GET /health.
On 200: state → running. After 300 s without a healthy response: state →
stopped. Closes #10.
This commit is contained in:
pyr0ball 2026-04-02 17:18:16 -07:00
parent bd132851ec
commit a7290c1240

View file

@ -1,9 +1,14 @@
from __future__ import annotations
import logging
import time
import urllib.request
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
@ -17,6 +22,54 @@ from circuitforge_core.resources.coordinator.service_registry import ServiceRegi
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
_PROBE_INTERVAL_S = 5.0 # how often to poll starting instances
_PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds
async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None:
"""
Background loop: transition 'starting' instances to 'running' once their
/health endpoint responds, or to 'stopped' after PROBE_TIMEOUT_S.
"""
import asyncio
start_times: dict[str, float] = {} # instance key → time first seen as starting
while True:
await asyncio.sleep(_PROBE_INTERVAL_S)
now = time.time()
for inst in service_registry.all_instances():
if inst.state != "starting":
start_times.pop(f"{inst.service}:{inst.node_id}:{inst.gpu_id}", None)
continue
key = f"{inst.service}:{inst.node_id}:{inst.gpu_id}"
start_times.setdefault(key, now)
healthy = False
if inst.url:
try:
with urllib.request.urlopen(
inst.url.rstrip("/") + "/health", timeout=2.0
) as resp:
healthy = resp.status == 200
except Exception:
pass
if healthy:
service_registry.upsert_instance(
service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id,
state="running", model=inst.model, url=inst.url,
)
start_times.pop(key, None)
logger.info("Instance %s/%s gpu=%s transitioned to running", inst.service, inst.node_id, inst.gpu_id)
elif now - start_times[key] > _PROBE_TIMEOUT_S:
service_registry.upsert_instance(
service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id,
state="stopped", model=inst.model, url=inst.url,
)
start_times.pop(key, None)
logger.warning("Instance %s/%s gpu=%s timed out in starting state — marked stopped", inst.service, inst.node_id, inst.gpu_id)
class LeaseRequest(BaseModel):
node_id: str
@ -61,10 +114,12 @@ def create_coordinator_app(
@asynccontextmanager
async def _lifespan(app: FastAPI): # type: ignore[type-arg]
import asyncio
task = asyncio.create_task(agent_supervisor.run_heartbeat_loop())
heartbeat_task = asyncio.create_task(agent_supervisor.run_heartbeat_loop())
probe_task = asyncio.create_task(_run_instance_probe_loop(service_registry))
yield
agent_supervisor.stop()
task.cancel()
heartbeat_task.cancel()
probe_task.cancel()
app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan)