feat(orch): background health probe loop — starting → running transition
Coordinator now polls all 'starting' instances every 5 s via GET /health. On 200: state → running. After 300 s without a healthy response: state → stopped. Closes #10.
This commit is contained in:
parent
bd132851ec
commit
a7290c1240
1 changed files with 57 additions and 2 deletions
|
|
@ -1,9 +1,14 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
import urllib.request
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.responses import HTMLResponse
|
||||
from pydantic import BaseModel
|
||||
|
|
@ -17,6 +22,54 @@ from circuitforge_core.resources.coordinator.service_registry import ServiceRegi
|
|||
|
||||
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
|
||||
|
||||
_PROBE_INTERVAL_S = 5.0 # how often to poll starting instances
|
||||
_PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds
|
||||
|
||||
|
||||
async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None:
|
||||
"""
|
||||
Background loop: transition 'starting' instances to 'running' once their
|
||||
/health endpoint responds, or to 'stopped' after PROBE_TIMEOUT_S.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
start_times: dict[str, float] = {} # instance key → time first seen as starting
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(_PROBE_INTERVAL_S)
|
||||
now = time.time()
|
||||
for inst in service_registry.all_instances():
|
||||
if inst.state != "starting":
|
||||
start_times.pop(f"{inst.service}:{inst.node_id}:{inst.gpu_id}", None)
|
||||
continue
|
||||
key = f"{inst.service}:{inst.node_id}:{inst.gpu_id}"
|
||||
start_times.setdefault(key, now)
|
||||
|
||||
healthy = False
|
||||
if inst.url:
|
||||
try:
|
||||
with urllib.request.urlopen(
|
||||
inst.url.rstrip("/") + "/health", timeout=2.0
|
||||
) as resp:
|
||||
healthy = resp.status == 200
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if healthy:
|
||||
service_registry.upsert_instance(
|
||||
service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id,
|
||||
state="running", model=inst.model, url=inst.url,
|
||||
)
|
||||
start_times.pop(key, None)
|
||||
logger.info("Instance %s/%s gpu=%s transitioned to running", inst.service, inst.node_id, inst.gpu_id)
|
||||
elif now - start_times[key] > _PROBE_TIMEOUT_S:
|
||||
service_registry.upsert_instance(
|
||||
service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id,
|
||||
state="stopped", model=inst.model, url=inst.url,
|
||||
)
|
||||
start_times.pop(key, None)
|
||||
logger.warning("Instance %s/%s gpu=%s timed out in starting state — marked stopped", inst.service, inst.node_id, inst.gpu_id)
|
||||
|
||||
|
||||
class LeaseRequest(BaseModel):
|
||||
node_id: str
|
||||
|
|
@ -61,10 +114,12 @@ def create_coordinator_app(
|
|||
@asynccontextmanager
|
||||
async def _lifespan(app: FastAPI): # type: ignore[type-arg]
|
||||
import asyncio
|
||||
task = asyncio.create_task(agent_supervisor.run_heartbeat_loop())
|
||||
heartbeat_task = asyncio.create_task(agent_supervisor.run_heartbeat_loop())
|
||||
probe_task = asyncio.create_task(_run_instance_probe_loop(service_registry))
|
||||
yield
|
||||
agent_supervisor.stop()
|
||||
task.cancel()
|
||||
heartbeat_task.cancel()
|
||||
probe_task.cancel()
|
||||
|
||||
app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue