diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index b893652..f55cbb5 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -1,9 +1,14 @@ from __future__ import annotations +import logging +import time +import urllib.request from contextlib import asynccontextmanager from pathlib import Path from typing import Any +logger = logging.getLogger(__name__) + from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel @@ -17,6 +22,54 @@ from circuitforge_core.resources.coordinator.service_registry import ServiceRegi _DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text() +_PROBE_INTERVAL_S = 5.0 # how often to poll starting instances +_PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds + + +async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None: + """ + Background loop: transition 'starting' instances to 'running' once their + /health endpoint responds, or to 'stopped' after PROBE_TIMEOUT_S. + """ + import asyncio + + start_times: dict[str, float] = {} # instance key → time first seen as starting + + while True: + await asyncio.sleep(_PROBE_INTERVAL_S) + now = time.time() + for inst in service_registry.all_instances(): + if inst.state != "starting": + start_times.pop(f"{inst.service}:{inst.node_id}:{inst.gpu_id}", None) + continue + key = f"{inst.service}:{inst.node_id}:{inst.gpu_id}" + start_times.setdefault(key, now) + + healthy = False + if inst.url: + try: + with urllib.request.urlopen( + inst.url.rstrip("/") + "/health", timeout=2.0 + ) as resp: + healthy = resp.status == 200 + except Exception: + pass + + if healthy: + service_registry.upsert_instance( + service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id, + state="running", model=inst.model, url=inst.url, + ) + start_times.pop(key, None) + logger.info("Instance %s/%s gpu=%s transitioned to running", inst.service, inst.node_id, inst.gpu_id) + elif now - start_times[key] > _PROBE_TIMEOUT_S: + service_registry.upsert_instance( + service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id, + state="stopped", model=inst.model, url=inst.url, + ) + start_times.pop(key, None) + logger.warning("Instance %s/%s gpu=%s timed out in starting state — marked stopped", inst.service, inst.node_id, inst.gpu_id) + class LeaseRequest(BaseModel): node_id: str @@ -61,10 +114,12 @@ def create_coordinator_app( @asynccontextmanager async def _lifespan(app: FastAPI): # type: ignore[type-arg] import asyncio - task = asyncio.create_task(agent_supervisor.run_heartbeat_loop()) + heartbeat_task = asyncio.create_task(agent_supervisor.run_heartbeat_loop()) + probe_task = asyncio.create_task(_run_instance_probe_loop(service_registry)) yield agent_supervisor.stop() - task.cancel() + heartbeat_task.cancel() + probe_task.cancel() app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan)