avocet/app/cforch.py
pyr0ball 9fdaeeb3d6 feat: multi-bench dashboard, API path migration, benchmark reliability fixes
- dashboard: eval card now shows last run + score for all bench types
  (classifier, LLM, style, plans) via new _get_recent_bench_runs()
- dashboard: skip cforch LLM-bench list summaries when scanning for
  classifier best_macro_f1 (fixes _find_latest_classifier_bench)
- cforch: stale _BENCH_RUNNING flag now auto-resets if process exited;
  idle timeout (120s via select) kills hung benchmark if node crashes
- api: add /api/finetune/{run,cancel} backward-compat shims while
  ClassifierTab fine-tune section is migrated to TrainJobsView
- ClassifierTab: migrate all /api/benchmark/* paths to /api/cforch/*;
  fix null-safety on results.models access; load fine-tuned models from
  /api/train/results instead of /api/finetune/status
- CompareTab: extend model picker to include vllm + cf-text alongside
  ollama, grouped by service; pre-select all LLM_SERVICES on load
- LlmEvalTab: null-safety on quality_by_task_type lookups
- models: AVOCET_MODELS_DIR env var overrides default models/ path
2026-05-11 09:05:12 -07:00

549 lines
22 KiB
Python

"""Avocet — cf-orch benchmark integration API.
Wraps cf-orch's benchmark.py script and exposes it via the Avocet API.
Config is read from label_tool.yaml under the `cforch:` key.
All endpoints are registered on `router` (a FastAPI APIRouter).
api.py includes this router with prefix="/api/cforch".
Module-level globals (_CONFIG_DIR, _BENCH_RUNNING, _bench_proc) follow the
same testability pattern as sft.py — override _CONFIG_DIR via set_config_dir()
in test fixtures.
"""
from __future__ import annotations
import json
import logging
import os
import re
import select as _select
import subprocess as _subprocess
import tempfile
from pathlib import Path
from typing import Any
import urllib.parse
import yaml
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
logger = logging.getLogger(__name__)
_ROOT = Path(__file__).parent.parent
_CONFIG_DIR: Path | None = None # override in tests
_BENCH_RUNNING: bool = False
_bench_proc: Any = None # live Popen object while benchmark runs
router = APIRouter()
# ── Testability seams ──────────────────────────────────────────────────────────
def set_config_dir(path: Path | None) -> None:
global _CONFIG_DIR
_CONFIG_DIR = path
# ── Internal helpers ───────────────────────────────────────────────────────────
def _config_file() -> Path:
if _CONFIG_DIR is not None:
return _CONFIG_DIR / "label_tool.yaml"
return _ROOT / "config" / "label_tool.yaml"
def _load_cforch_config() -> dict:
"""Read label_tool.yaml cforch section, falling back to environment variables.
Priority (highest to lowest):
1. label_tool.yaml cforch: key
2. Environment variables (CF_ORCH_URL, CF_LICENSE_KEY, OLLAMA_HOST, OLLAMA_MODEL)
"""
f = _config_file()
file_cfg: dict = {}
if f.exists():
try:
raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {}
file_cfg = raw.get("cforch", {}) or {}
except yaml.YAMLError as exc:
logger.warning("Failed to parse cforch config %s: %s", f, exc)
# Env var fallbacks — only used when the yaml key is absent or empty
def _coalesce(file_val: str, env_key: str) -> str:
return file_val if file_val else os.environ.get(env_key, "")
return {
**file_cfg,
"coordinator_url": _coalesce(file_cfg.get("coordinator_url", ""), "CF_ORCH_URL"),
"license_key": _coalesce(file_cfg.get("license_key", ""), "CF_LICENSE_KEY"),
"ollama_url": _coalesce(file_cfg.get("ollama_url", ""), "OLLAMA_HOST"),
"ollama_model": _coalesce(file_cfg.get("ollama_model", ""), "OLLAMA_MODEL"),
"judge_url": _coalesce(file_cfg.get("judge_url", ""), "CF_JUDGE_URL"),
"hf_token": _coalesce(file_cfg.get("hf_token", ""), "HF_TOKEN"),
}
def _validate_service_url(url: str, param_name: str) -> str:
"""Validate that a URL is a well-formed http/https URL with a hostname.
Guards against SSRF: only http/https is allowed; the URL must have a
non-empty host. Does not enforce an allowlist — call sites are internal
tooling, not a public API.
"""
if not url:
return url
try:
parsed = urllib.parse.urlparse(url)
except Exception:
raise HTTPException(400, f"{param_name}: not a valid URL")
if parsed.scheme not in ("http", "https"):
raise HTTPException(400, f"{param_name}: URL must start with http:// or https://")
if not parsed.hostname:
raise HTTPException(400, f"{param_name}: URL has no hostname")
return url
def _strip_ansi(text: str) -> str:
"""Remove ANSI escape codes from a string."""
return re.sub(r'\x1b\[[0-9;]*m', '', text)
def _find_latest_summary(results_dir: str | None) -> Path | None:
"""Find the newest summary.json under results_dir, or None if not found."""
if not results_dir:
return None
rdir = Path(results_dir)
if not rdir.exists():
return None
# Subdirs are named YYYY-MM-DD-HHMMSS; sort lexicographically for chronological order
subdirs = sorted(
[d for d in rdir.iterdir() if d.is_dir()],
key=lambda d: d.name,
)
for subdir in reversed(subdirs):
summary = subdir / "summary.json"
if summary.exists():
return summary
return None
# ── GET /tasks ─────────────────────────────────────────────────────────────────
@router.get("/tasks")
def get_tasks() -> dict:
"""Return task list from bench_tasks.yaml."""
cfg = _load_cforch_config()
tasks_path = cfg.get("bench_tasks", "")
if not tasks_path:
return {"tasks": [], "types": []}
p = Path(tasks_path)
if not p.exists():
return {"tasks": [], "types": []}
try:
raw = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
except yaml.YAMLError as exc:
logger.warning("Failed to parse bench_tasks.yaml %s: %s", p, exc)
return {"tasks": [], "types": []}
tasks_raw = raw.get("tasks", []) or []
tasks: list[dict] = []
seen_types: list[str] = []
types_set: set[str] = set()
for t in tasks_raw:
if not isinstance(t, dict):
continue
tasks.append({
"id": t.get("id", ""),
"name": t.get("name", ""),
"type": t.get("type", ""),
"prompt": (t.get("prompt") or "").strip(),
"system": (t.get("system") or "").strip(),
})
task_type = t.get("type", "")
if task_type and task_type not in types_set:
seen_types.append(task_type)
types_set.add(task_type)
return {"tasks": tasks, "types": seen_types}
# ── GET /models ────────────────────────────────────────────────────────────────
# Services and roles surfaced in the benchmark model picker.
# Covers all cf-orch service types that benchmark.py can route tasks to.
_BENCH_SERVICES = frozenset({
"cf-text", "vllm", # LLM text generation
"cf-stt", # speech-to-text
"cf-tts", # text-to-speech
"cf-vision", # image classification / embedding
"cf-voice", # audio context classification
})
_BENCH_ROLES = frozenset({
"generator", "vlm", # LLM roles
"stt", "alm", # speech recognition
"tts", # speech synthesis
"vision", "embedding", # image understanding
"classifier", # audio classification (cf-voice)
})
@router.get("/models")
def get_models() -> dict:
"""Return model list from bench_models.yaml merged with locally installed models.
bench_models.yaml entries are listed first and take precedence; any installed
model whose repo_id is already present in the YAML is skipped. Only models
whose service is in _BENCH_SERVICES (cf-text, vllm, cf-stt, cf-tts, cf-vision,
cf-voice) are surfaced from the installed registry.
"""
cfg = _load_cforch_config()
models_path = cfg.get("bench_models", "")
models: list[dict] = []
bench_ids: set[str] = set()
if models_path:
p = Path(models_path)
if p.exists():
try:
raw = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
except yaml.YAMLError as exc:
logger.warning("Failed to parse bench_models.yaml %s: %s", p, exc)
raw = {}
for m in (raw.get("models", []) or []):
if not isinstance(m, dict):
continue
model_id = m.get("id", "")
models.append({
"name": m.get("name", ""),
"id": model_id,
"service": m.get("service", "ollama"),
"tags": m.get("tags", []) or [],
"vram_estimate_mb": m.get("vram_estimate_mb", 0),
})
if model_id:
bench_ids.add(model_id)
# Merge installed generator models not already in bench_models.yaml.
try:
from app.models import list_installed # local import avoids circular dependency at module load
for installed in list_installed():
model_id: str = installed.get("model_id") or ""
service: str = installed.get("service") or ""
role: str = installed.get("role") or ""
if not model_id:
continue
if service not in _BENCH_SERVICES or role not in _BENCH_ROLES:
continue
if model_id in bench_ids:
continue
display_name = model_id.split("/", 1)[-1] if "/" in model_id else model_id
models.append({
"name": display_name,
"id": model_id,
"service": service,
"tags": [role],
"vram_estimate_mb": installed.get("vram_mb") or 0,
})
bench_ids.add(model_id)
except Exception as exc:
logger.warning("Could not merge installed models into model list: %s", exc)
return {"models": models}
# ── GET /run ───────────────────────────────────────────────────────────────────
@router.get("/nodes")
def get_nodes() -> dict:
"""Proxy the coordinator's /api/nodes list, returning node_id + online status.
Online is inferred from last_heartbeat: any node with a recent heartbeat is online.
Returns an empty list if the coordinator is unreachable.
"""
cfg = _load_cforch_config()
coordinator_url = cfg.get("coordinator_url", "").rstrip("/")
if not coordinator_url:
return {"nodes": []}
try:
import httpx as _httpx
resp = _httpx.get(f"{coordinator_url}/api/nodes", timeout=5.0)
resp.raise_for_status()
raw_nodes = resp.json().get("nodes", [])
return {
"nodes": [
{
"node_id": n.get("node_id", ""),
"online": n.get("last_heartbeat") is not None,
"gpus": [
{
"gpu_id": g.get("gpu_id"),
"name": g.get("name", ""),
"vram_total_mb": g.get("vram_total_mb", 0),
"vram_free_mb": g.get("vram_free_mb", 0),
}
for g in n.get("gpus", [])
],
}
for n in raw_nodes
]
}
except Exception as exc:
logger.warning("Could not fetch nodes from coordinator: %s", exc)
return {"nodes": []}
@router.get("/run")
def run_benchmark(
task_ids: str = "",
model_ids: str = "",
model_tags: str = "",
coordinator_url: str = "",
ollama_url: str = "",
judge_url: str = "",
judge_backend: str = "chat",
workers: int = 1,
node_ids: str = "",
) -> StreamingResponse:
"""Spawn cf-orch benchmark.py and stream stdout as SSE progress events."""
global _BENCH_RUNNING, _bench_proc
# Check if the process is actually still alive; reset stale flag if not.
if _BENCH_RUNNING:
if _bench_proc is not None and _bench_proc.poll() is None:
raise HTTPException(409, "A benchmark is already running")
_BENCH_RUNNING = False
_bench_proc = None
cfg = _load_cforch_config()
bench_script = cfg.get("bench_script", "")
bench_tasks = cfg.get("bench_tasks", "")
bench_models = cfg.get("bench_models", "")
results_dir = cfg.get("results_dir", "")
python_bin = cfg.get("python_bin", "/devl/miniconda3/envs/cf/bin/python")
cfg_coordinator = cfg.get("coordinator_url", "")
cfg_ollama = cfg.get("ollama_url", "")
cfg_license_key = cfg.get("license_key", "")
cfg_judge_url = cfg.get("judge_url", "")
# Validate URL params before spawning the subprocess.
# _validate_service_url raises HTTPException on bad input (caught by FastAPI before streaming starts).
_validate_service_url(coordinator_url, "coordinator_url")
_validate_service_url(ollama_url, "ollama_url")
_validate_service_url(judge_url, "judge_url")
def generate():
global _BENCH_RUNNING, _bench_proc
if not bench_script or not Path(bench_script).exists():
yield f"data: {json.dumps({'type': 'error', 'message': 'bench_script not configured or not found'})}\n\n"
return
# Build effective models file: bench_models.yaml + any installed models
# whose IDs were selected but are absent from the YAML (e.g. downloaded
# via the Models view). Written to a temp file so benchmark.py sees one
# unified list; cleaned up in the finally block.
effective_models_file = bench_models
_tmp_models_path: str | None = None
if model_ids and bench_models and Path(bench_models).exists():
requested_ids = set(model_ids.split(","))
try:
raw_bench = yaml.safe_load(Path(bench_models).read_text(encoding="utf-8")) or {}
bench_entries: list[dict] = raw_bench.get("models", []) or []
bench_id_set = {m.get("id", "") for m in bench_entries if isinstance(m, dict)}
missing_ids = requested_ids - bench_id_set
if missing_ids:
from app.models import list_installed
installed_map = {
m["model_id"]: m
for m in list_installed()
if m.get("model_id") and m.get("service") in _BENCH_SERVICES
}
extra: list[dict] = []
for mid in missing_ids:
if mid in installed_map:
inst = installed_map[mid]
entry: dict[str, Any] = {
"id": mid,
"name": mid.split("/", 1)[-1] if "/" in mid else mid,
"service": inst.get("service", "cf-text"),
"vram_estimate_mb": inst.get("vram_mb") or 0,
"tags": [inst.get("role", "generator")],
"temperature": 0.0,
}
local_path = inst.get("path", "") or inst.get("local_path", "")
if local_path:
entry["model_path"] = local_path
extra.append(entry)
if extra:
merged = {"models": bench_entries + extra}
tf = tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False,
prefix="avocet_bench_models_",
)
yaml.dump(merged, tf)
tf.close()
_tmp_models_path = tf.name
effective_models_file = _tmp_models_path
except Exception as exc:
logger.warning("Could not merge installed models into temp bench file: %s", exc)
cmd = [
python_bin,
bench_script,
"--tasks", bench_tasks,
"--models", effective_models_file,
"--output", results_dir,
]
if task_ids:
cmd.extend(["--filter-tasks"] + task_ids.split(","))
if model_ids:
cmd.extend(["--filter-models"] + model_ids.split(","))
if model_tags:
cmd.extend(["--filter-tags"] + model_tags.split(","))
# query param overrides config, config overrides env var (already resolved by _load_cforch_config)
effective_coordinator = coordinator_url if coordinator_url else cfg_coordinator
effective_ollama = ollama_url if ollama_url else cfg_ollama
if effective_coordinator:
cmd.extend(["--coordinator", effective_coordinator])
if effective_ollama:
cmd.extend(["--ollama-url", effective_ollama])
effective_judge = judge_url if judge_url else cfg_judge_url
if effective_judge:
cmd.extend(["--judge-url", effective_judge])
if judge_backend and judge_backend != "chat":
cmd.extend(["--judge-backend", judge_backend])
if workers > 1:
cmd.extend(["--workers", str(workers)])
if node_ids:
cmd.extend(["--nodes"] + node_ids.split(","))
# Pass license key as env var so subprocess can authenticate with cf-orch
proc_env = {**os.environ}
if cfg_license_key:
proc_env["CF_LICENSE_KEY"] = cfg_license_key
_BENCH_RUNNING = True
try:
proc = _subprocess.Popen(
cmd,
stdout=_subprocess.PIPE,
stderr=_subprocess.STDOUT,
text=True,
bufsize=1,
env=proc_env,
)
_bench_proc = proc
_IDLE_TIMEOUT_S = 120 # kill if no output for 2 minutes (node crash)
try:
while True:
ready = _select.select([proc.stdout], [], [], _IDLE_TIMEOUT_S)
if not ready[0]:
# No output for IDLE_TIMEOUT_S — node likely crashed
proc.terminate()
try:
proc.wait(timeout=5)
except _subprocess.TimeoutExpired:
proc.kill()
msg = f"Benchmark timed out — no output for {_IDLE_TIMEOUT_S}s (cluster node may have crashed)"
yield f"data: {json.dumps({'type': 'error', 'message': msg})}\n\n"
break
line = proc.stdout.readline()
if not line:
break
line = _strip_ansi(line.rstrip())
if line:
yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n"
proc.wait()
if proc.returncode == 0:
summary_path = _find_latest_summary(results_dir)
if summary_path is not None:
try:
summary = json.loads(summary_path.read_text(encoding="utf-8"))
yield f"data: {json.dumps({'type': 'result', 'summary': summary})}\n\n"
except Exception as exc:
logger.warning("Failed to read summary.json: %s", exc)
yield f"data: {json.dumps({'type': 'complete'})}\n\n"
else:
yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n"
finally:
_bench_proc = None
except Exception as exc:
yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n"
finally:
_BENCH_RUNNING = False
if _tmp_models_path:
try:
os.unlink(_tmp_models_path)
except OSError:
pass
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# ── GET /config ────────────────────────────────────────────────────────────────
@router.get("/config")
def get_cforch_config() -> dict:
"""Return resolved cf-orch connection config (env vars merged with yaml).
Redacts license_key — only returns whether it is set, not the value.
Used by the Settings UI to show current connection state.
"""
cfg = _load_cforch_config()
return {
"coordinator_url": cfg.get("coordinator_url", ""),
"ollama_url": cfg.get("ollama_url", ""),
"ollama_model": cfg.get("ollama_model", ""),
"judge_url": cfg.get("judge_url", ""),
"license_key_set": bool(cfg.get("license_key", "")),
"source": "env" if not _config_file().exists() else "yaml+env",
}
# ── GET /results ───────────────────────────────────────────────────────────────
@router.get("/results")
def get_results() -> list:
"""Return the latest benchmark summary.json from results_dir."""
cfg = _load_cforch_config()
results_dir = cfg.get("results_dir", "")
summary_path = _find_latest_summary(results_dir)
if summary_path is None:
raise HTTPException(404, "No benchmark results found")
try:
return json.loads(summary_path.read_text(encoding="utf-8"))
except Exception as exc:
raise HTTPException(500, f"Failed to read summary.json: {exc}") from exc
# ── POST /cancel ───────────────────────────────────────────────────────────────
@router.post("/cancel")
def cancel_benchmark() -> dict:
"""Kill the running benchmark subprocess."""
global _BENCH_RUNNING, _bench_proc
if not _BENCH_RUNNING:
raise HTTPException(404, "No benchmark is currently running")
if _bench_proc is not None:
try:
_bench_proc.terminate()
except Exception as exc:
logger.warning("Failed to terminate benchmark process: %s", exc)
_BENCH_RUNNING = False
_bench_proc = None
return {"status": "cancelled"}