feat(orch): background health probe loop — starting → running transition
Coordinator now polls all 'starting' instances every 5 s via GET /health. On 200: state → running. After 300 s without a healthy response: state → stopped. Closes #10.
This commit is contained in:
parent
bd132851ec
commit
a7290c1240
1 changed files with 57 additions and 2 deletions
|
|
@ -1,9 +1,14 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import urllib.request
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, HTTPException
|
||||||
from fastapi.responses import HTMLResponse
|
from fastapi.responses import HTMLResponse
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
@ -17,6 +22,54 @@ from circuitforge_core.resources.coordinator.service_registry import ServiceRegi
|
||||||
|
|
||||||
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
|
_DASHBOARD_HTML = (Path(__file__).parent / "dashboard.html").read_text()
|
||||||
|
|
||||||
|
_PROBE_INTERVAL_S = 5.0 # how often to poll starting instances
|
||||||
|
_PROBE_TIMEOUT_S = 300.0 # give up and mark stopped after this many seconds
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_instance_probe_loop(service_registry: ServiceRegistry) -> None:
|
||||||
|
"""
|
||||||
|
Background loop: transition 'starting' instances to 'running' once their
|
||||||
|
/health endpoint responds, or to 'stopped' after PROBE_TIMEOUT_S.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
start_times: dict[str, float] = {} # instance key → time first seen as starting
|
||||||
|
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(_PROBE_INTERVAL_S)
|
||||||
|
now = time.time()
|
||||||
|
for inst in service_registry.all_instances():
|
||||||
|
if inst.state != "starting":
|
||||||
|
start_times.pop(f"{inst.service}:{inst.node_id}:{inst.gpu_id}", None)
|
||||||
|
continue
|
||||||
|
key = f"{inst.service}:{inst.node_id}:{inst.gpu_id}"
|
||||||
|
start_times.setdefault(key, now)
|
||||||
|
|
||||||
|
healthy = False
|
||||||
|
if inst.url:
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(
|
||||||
|
inst.url.rstrip("/") + "/health", timeout=2.0
|
||||||
|
) as resp:
|
||||||
|
healthy = resp.status == 200
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if healthy:
|
||||||
|
service_registry.upsert_instance(
|
||||||
|
service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id,
|
||||||
|
state="running", model=inst.model, url=inst.url,
|
||||||
|
)
|
||||||
|
start_times.pop(key, None)
|
||||||
|
logger.info("Instance %s/%s gpu=%s transitioned to running", inst.service, inst.node_id, inst.gpu_id)
|
||||||
|
elif now - start_times[key] > _PROBE_TIMEOUT_S:
|
||||||
|
service_registry.upsert_instance(
|
||||||
|
service=inst.service, node_id=inst.node_id, gpu_id=inst.gpu_id,
|
||||||
|
state="stopped", model=inst.model, url=inst.url,
|
||||||
|
)
|
||||||
|
start_times.pop(key, None)
|
||||||
|
logger.warning("Instance %s/%s gpu=%s timed out in starting state — marked stopped", inst.service, inst.node_id, inst.gpu_id)
|
||||||
|
|
||||||
|
|
||||||
class LeaseRequest(BaseModel):
|
class LeaseRequest(BaseModel):
|
||||||
node_id: str
|
node_id: str
|
||||||
|
|
@ -61,10 +114,12 @@ def create_coordinator_app(
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _lifespan(app: FastAPI): # type: ignore[type-arg]
|
async def _lifespan(app: FastAPI): # type: ignore[type-arg]
|
||||||
import asyncio
|
import asyncio
|
||||||
task = asyncio.create_task(agent_supervisor.run_heartbeat_loop())
|
heartbeat_task = asyncio.create_task(agent_supervisor.run_heartbeat_loop())
|
||||||
|
probe_task = asyncio.create_task(_run_instance_probe_loop(service_registry))
|
||||||
yield
|
yield
|
||||||
agent_supervisor.stop()
|
agent_supervisor.stop()
|
||||||
task.cancel()
|
heartbeat_task.cancel()
|
||||||
|
probe_task.cancel()
|
||||||
|
|
||||||
app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan)
|
app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue