diff --git a/app/api.py b/app/api.py
index 0788e13..e1babf5 100644
--- a/app/api.py
+++ b/app/api.py
@@ -149,6 +149,9 @@ from app.models import router as models_router
import app.models as _models_module
app.include_router(models_router, prefix="/api/models")
+from app.cforch import router as cforch_router
+app.include_router(cforch_router, prefix="/api/cforch")
+
# In-memory last-action store (single user, local tool — in-memory is fine)
_last_action: dict | None = None
diff --git a/app/cforch.py b/app/cforch.py
new file mode 100644
index 0000000..27ca050
--- /dev/null
+++ b/app/cforch.py
@@ -0,0 +1,291 @@
+"""Avocet — cf-orch benchmark integration API.
+
+Wraps cf-orch's benchmark.py script and exposes it via the Avocet API.
+Config is read from label_tool.yaml under the `cforch:` key.
+
+All endpoints are registered on `router` (a FastAPI APIRouter).
+api.py includes this router with prefix="/api/cforch".
+
+Module-level globals (_CONFIG_DIR, _BENCH_RUNNING, _bench_proc) follow the
+same testability pattern as sft.py — override _CONFIG_DIR via set_config_dir()
+in test fixtures.
+"""
+from __future__ import annotations
+
+import json
+import logging
+import re
+import subprocess as _subprocess
+from pathlib import Path
+from typing import Any
+
+import yaml
+from fastapi import APIRouter, HTTPException
+from fastapi.responses import StreamingResponse
+
+logger = logging.getLogger(__name__)
+
+_ROOT = Path(__file__).parent.parent
+_CONFIG_DIR: Path | None = None # override in tests
+_BENCH_RUNNING: bool = False
+_bench_proc: Any = None # live Popen object while benchmark runs
+
+router = APIRouter()
+
+
+# ── Testability seams ──────────────────────────────────────────────────────────
+
+def set_config_dir(path: Path | None) -> None:
+ global _CONFIG_DIR
+ _CONFIG_DIR = path
+
+
+# ── Internal helpers ───────────────────────────────────────────────────────────
+
+def _config_file() -> Path:
+ if _CONFIG_DIR is not None:
+ return _CONFIG_DIR / "label_tool.yaml"
+ return _ROOT / "config" / "label_tool.yaml"
+
+
+def _load_cforch_config() -> dict:
+ """Read label_tool.yaml and return the cforch sub-dict (or {} if absent/malformed)."""
+ f = _config_file()
+ if not f.exists():
+ return {}
+ try:
+ raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {}
+ except yaml.YAMLError as exc:
+ logger.warning("Failed to parse cforch config %s: %s", f, exc)
+ return {}
+ return raw.get("cforch", {}) or {}
+
+
+def _strip_ansi(text: str) -> str:
+ """Remove ANSI escape codes from a string."""
+ return re.sub(r'\x1b\[[0-9;]*m', '', text)
+
+
+def _find_latest_summary(results_dir: str | None) -> Path | None:
+ """Find the newest summary.json under results_dir, or None if not found."""
+ if not results_dir:
+ return None
+ rdir = Path(results_dir)
+ if not rdir.exists():
+ return None
+ # Subdirs are named YYYY-MM-DD-HHMMSS; sort lexicographically for chronological order
+ subdirs = sorted(
+ [d for d in rdir.iterdir() if d.is_dir()],
+ key=lambda d: d.name,
+ )
+ for subdir in reversed(subdirs):
+ summary = subdir / "summary.json"
+ if summary.exists():
+ return summary
+ return None
+
+
+# ── GET /tasks ─────────────────────────────────────────────────────────────────
+
+@router.get("/tasks")
+def get_tasks() -> dict:
+ """Return task list from bench_tasks.yaml."""
+ cfg = _load_cforch_config()
+ tasks_path = cfg.get("bench_tasks", "")
+ if not tasks_path:
+ return {"tasks": [], "types": []}
+
+ p = Path(tasks_path)
+ if not p.exists():
+ return {"tasks": [], "types": []}
+
+ try:
+ raw = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
+ except yaml.YAMLError as exc:
+ logger.warning("Failed to parse bench_tasks.yaml %s: %s", p, exc)
+ return {"tasks": [], "types": []}
+
+ tasks_raw = raw.get("tasks", []) or []
+ tasks: list[dict] = []
+ seen_types: list[str] = []
+ types_set: set[str] = set()
+
+ for t in tasks_raw:
+ if not isinstance(t, dict):
+ continue
+ tasks.append({
+ "id": t.get("id", ""),
+ "name": t.get("name", ""),
+ "type": t.get("type", ""),
+ })
+ task_type = t.get("type", "")
+ if task_type and task_type not in types_set:
+ seen_types.append(task_type)
+ types_set.add(task_type)
+
+ return {"tasks": tasks, "types": seen_types}
+
+
+# ── GET /models ────────────────────────────────────────────────────────────────
+
+@router.get("/models")
+def get_models() -> dict:
+ """Return model list from bench_models.yaml."""
+ cfg = _load_cforch_config()
+ models_path = cfg.get("bench_models", "")
+ if not models_path:
+ return {"models": []}
+
+ p = Path(models_path)
+ if not p.exists():
+ return {"models": []}
+
+ try:
+ raw = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
+ except yaml.YAMLError as exc:
+ logger.warning("Failed to parse bench_models.yaml %s: %s", p, exc)
+ return {"models": []}
+
+ models_raw = raw.get("models", []) or []
+ models: list[dict] = []
+ for m in models_raw:
+ if not isinstance(m, dict):
+ continue
+ models.append({
+ "name": m.get("name", ""),
+ "id": m.get("id", ""),
+ "service": m.get("service", "ollama"),
+ "tags": m.get("tags", []) or [],
+ "vram_estimate_mb": m.get("vram_estimate_mb", 0),
+ })
+
+ return {"models": models}
+
+
+# ── GET /run ───────────────────────────────────────────────────────────────────
+
+@router.get("/run")
+def run_benchmark(
+ task_ids: str = "",
+ model_tags: str = "",
+ coordinator_url: str = "",
+ ollama_url: str = "",
+) -> StreamingResponse:
+ """Spawn cf-orch benchmark.py and stream stdout as SSE progress events."""
+ global _BENCH_RUNNING, _bench_proc
+
+ if _BENCH_RUNNING:
+ raise HTTPException(409, "A benchmark is already running")
+
+ cfg = _load_cforch_config()
+ bench_script = cfg.get("bench_script", "")
+ bench_tasks = cfg.get("bench_tasks", "")
+ bench_models = cfg.get("bench_models", "")
+ results_dir = cfg.get("results_dir", "")
+ python_bin = cfg.get("python_bin", "/devl/miniconda3/envs/cf/bin/python")
+ cfg_coordinator = cfg.get("coordinator_url", "")
+ cfg_ollama = cfg.get("ollama_url", "")
+
+ def generate():
+ global _BENCH_RUNNING, _bench_proc
+
+ if not bench_script or not Path(bench_script).exists():
+ yield f"data: {json.dumps({'type': 'error', 'message': 'bench_script not configured or not found'})}\n\n"
+ return
+
+ cmd = [
+ python_bin,
+ bench_script,
+ "--tasks", bench_tasks,
+ "--models", bench_models,
+ "--output", results_dir,
+ ]
+
+ if task_ids:
+ cmd.extend(["--filter-tasks"] + task_ids.split(","))
+ if model_tags:
+ cmd.extend(["--filter-tags"] + model_tags.split(","))
+
+ effective_coordinator = coordinator_url if coordinator_url else cfg_coordinator
+ effective_ollama = ollama_url if ollama_url else cfg_ollama
+ if effective_coordinator:
+ cmd.extend(["--coordinator", effective_coordinator])
+ if effective_ollama:
+ cmd.extend(["--ollama-url", effective_ollama])
+
+ _BENCH_RUNNING = True
+ try:
+ proc = _subprocess.Popen(
+ cmd,
+ stdout=_subprocess.PIPE,
+ stderr=_subprocess.STDOUT,
+ text=True,
+ bufsize=1,
+ )
+ _bench_proc = proc
+ try:
+ for line in proc.stdout:
+ line = _strip_ansi(line.rstrip())
+ if line:
+ yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n"
+ proc.wait()
+ if proc.returncode == 0:
+ summary_path = _find_latest_summary(results_dir)
+ if summary_path is not None:
+ try:
+ summary = json.loads(summary_path.read_text(encoding="utf-8"))
+ yield f"data: {json.dumps({'type': 'result', 'summary': summary})}\n\n"
+ except Exception as exc:
+ logger.warning("Failed to read summary.json: %s", exc)
+ yield f"data: {json.dumps({'type': 'complete'})}\n\n"
+ else:
+ yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n"
+ finally:
+ _bench_proc = None
+ except Exception as exc:
+ yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n"
+ finally:
+ _BENCH_RUNNING = False
+
+ return StreamingResponse(
+ generate(),
+ media_type="text/event-stream",
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
+ )
+
+
+# ── GET /results ───────────────────────────────────────────────────────────────
+
+@router.get("/results")
+def get_results() -> dict:
+ """Return the latest benchmark summary.json from results_dir."""
+ cfg = _load_cforch_config()
+ results_dir = cfg.get("results_dir", "")
+ summary_path = _find_latest_summary(results_dir)
+ if summary_path is None:
+ raise HTTPException(404, "No benchmark results found")
+ try:
+ return json.loads(summary_path.read_text(encoding="utf-8"))
+ except Exception as exc:
+ raise HTTPException(500, f"Failed to read summary.json: {exc}") from exc
+
+
+# ── POST /cancel ───────────────────────────────────────────────────────────────
+
+@router.post("/cancel")
+def cancel_benchmark() -> dict:
+ """Kill the running benchmark subprocess."""
+ global _BENCH_RUNNING, _bench_proc
+
+ if not _BENCH_RUNNING:
+ raise HTTPException(404, "No benchmark is currently running")
+
+ if _bench_proc is not None:
+ try:
+ _bench_proc.terminate()
+ except Exception as exc:
+ logger.warning("Failed to terminate benchmark process: %s", exc)
+
+ _BENCH_RUNNING = False
+ _bench_proc = None
+ return {"status": "cancelled"}
diff --git a/tests/test_cforch.py b/tests/test_cforch.py
new file mode 100644
index 0000000..537cd5f
--- /dev/null
+++ b/tests/test_cforch.py
@@ -0,0 +1,282 @@
+"""Tests for app/cforch.py — /api/cforch/* endpoints."""
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import pytest
+import yaml
+from fastapi.testclient import TestClient
+
+
+# ── Fixtures ───────────────────────────────────────────────────────────────────
+
+@pytest.fixture(autouse=True)
+def reset_cforch_globals(tmp_path):
+ """Redirect _CONFIG_DIR to tmp_path and reset running-state globals."""
+ from app import cforch as cforch_module
+
+ prev_config_dir = cforch_module._CONFIG_DIR
+ prev_running = cforch_module._BENCH_RUNNING
+ prev_proc = cforch_module._bench_proc
+
+ cforch_module.set_config_dir(tmp_path)
+ cforch_module._BENCH_RUNNING = False
+ cforch_module._bench_proc = None
+
+ yield tmp_path
+
+ cforch_module.set_config_dir(prev_config_dir)
+ cforch_module._BENCH_RUNNING = prev_running
+ cforch_module._bench_proc = prev_proc
+
+
+@pytest.fixture
+def client():
+ from app.api import app
+ return TestClient(app)
+
+
+@pytest.fixture
+def config_dir(reset_cforch_globals):
+ """Return the tmp config dir (already set as _CONFIG_DIR)."""
+ return reset_cforch_globals
+
+
+def _write_config(config_dir: Path, cforch_cfg: dict) -> None:
+ """Write a label_tool.yaml with the given cforch block into config_dir."""
+ cfg = {"cforch": cforch_cfg}
+ (config_dir / "label_tool.yaml").write_text(
+ yaml.dump(cfg), encoding="utf-8"
+ )
+
+
+def _write_tasks_yaml(path: Path, tasks: list[dict]) -> None:
+ path.write_text(yaml.dump({"tasks": tasks}), encoding="utf-8")
+
+
+def _write_models_yaml(path: Path, models: list[dict]) -> None:
+ path.write_text(yaml.dump({"models": models}), encoding="utf-8")
+
+
+# ── GET /tasks ─────────────────────────────────────────────────────────────────
+
+def test_tasks_returns_empty_when_not_configured(client):
+ """No config file present — endpoint returns empty lists."""
+ r = client.get("/api/cforch/tasks")
+ assert r.status_code == 200
+ data = r.json()
+ assert data == {"tasks": [], "types": []}
+
+
+def test_tasks_parses_yaml(client, config_dir, tmp_path):
+ tasks_file = tmp_path / "bench_tasks.yaml"
+ _write_tasks_yaml(tasks_file, [
+ {"id": "t1", "name": "Task One", "type": "instruction"},
+ {"id": "t2", "name": "Task Two", "type": "reasoning"},
+ ])
+ _write_config(config_dir, {"bench_tasks": str(tasks_file)})
+
+ r = client.get("/api/cforch/tasks")
+ assert r.status_code == 200
+ data = r.json()
+ assert len(data["tasks"]) == 2
+ assert data["tasks"][0] == {"id": "t1", "name": "Task One", "type": "instruction"}
+ assert data["tasks"][1] == {"id": "t2", "name": "Task Two", "type": "reasoning"}
+ assert "instruction" in data["types"]
+ assert "reasoning" in data["types"]
+
+
+def test_tasks_returns_types_deduplicated(client, config_dir, tmp_path):
+ """Multiple tasks sharing a type — types list must not duplicate."""
+ tasks_file = tmp_path / "bench_tasks.yaml"
+ _write_tasks_yaml(tasks_file, [
+ {"id": "t1", "name": "A", "type": "instruction"},
+ {"id": "t2", "name": "B", "type": "instruction"},
+ {"id": "t3", "name": "C", "type": "reasoning"},
+ ])
+ _write_config(config_dir, {"bench_tasks": str(tasks_file)})
+
+ r = client.get("/api/cforch/tasks")
+ data = r.json()
+ assert data["types"].count("instruction") == 1
+ assert len(data["types"]) == 2
+
+
+# ── GET /models ────────────────────────────────────────────────────────────────
+
+def test_models_returns_empty_when_not_configured(client):
+ """No config file present — endpoint returns empty model list."""
+ r = client.get("/api/cforch/models")
+ assert r.status_code == 200
+ assert r.json() == {"models": []}
+
+
+def test_models_parses_bench_models_yaml(client, config_dir, tmp_path):
+ models_file = tmp_path / "bench_models.yaml"
+ _write_models_yaml(models_file, [
+ {
+ "name": "llama3",
+ "id": "llama3:8b",
+ "service": "ollama",
+ "tags": ["fast", "small"],
+ "vram_estimate_mb": 6000,
+ }
+ ])
+ _write_config(config_dir, {"bench_models": str(models_file)})
+
+ r = client.get("/api/cforch/models")
+ assert r.status_code == 200
+ data = r.json()
+ assert len(data["models"]) == 1
+ m = data["models"][0]
+ assert m["name"] == "llama3"
+ assert m["id"] == "llama3:8b"
+ assert m["service"] == "ollama"
+ assert m["tags"] == ["fast", "small"]
+ assert m["vram_estimate_mb"] == 6000
+
+
+# ── GET /run ───────────────────────────────────────────────────────────────────
+
+def test_run_returns_409_when_already_running(client):
+ """If _BENCH_RUNNING is True, GET /run returns 409."""
+ from app import cforch as cforch_module
+ cforch_module._BENCH_RUNNING = True
+
+ r = client.get("/api/cforch/run")
+ assert r.status_code == 409
+
+
+def test_run_returns_error_when_bench_script_not_configured(client):
+ """No config at all — SSE stream contains an error event."""
+ r = client.get("/api/cforch/run")
+ assert r.status_code == 200
+ assert '"type": "error"' in r.text
+ assert "bench_script not configured" in r.text
+
+
+def test_run_streams_progress_events(client, config_dir, tmp_path):
+ """Mock subprocess — SSE stream emits progress events from stdout."""
+ bench_script = tmp_path / "fake_benchmark.py"
+ bench_script.write_text("# fake", encoding="utf-8")
+
+ tasks_file = tmp_path / "bench_tasks.yaml"
+ tasks_file.write_text(yaml.dump({"tasks": []}), encoding="utf-8")
+ models_file = tmp_path / "bench_models.yaml"
+ models_file.write_text(yaml.dump({"models": []}), encoding="utf-8")
+ results_dir = tmp_path / "results"
+ results_dir.mkdir()
+
+ _write_config(config_dir, {
+ "bench_script": str(bench_script),
+ "bench_tasks": str(tasks_file),
+ "bench_models": str(models_file),
+ "results_dir": str(results_dir),
+ "python_bin": "/usr/bin/python3",
+ })
+
+ mock_proc = MagicMock()
+ mock_proc.stdout = iter(["Running task 1\n", "Running task 2\n"])
+ mock_proc.returncode = 1 # non-zero so we don't need summary.json
+
+ def mock_wait():
+ pass
+
+ mock_proc.wait = mock_wait
+
+ with patch("app.cforch._subprocess.Popen", return_value=mock_proc):
+ r = client.get("/api/cforch/run")
+
+ assert r.status_code == 200
+ assert '"type": "progress"' in r.text
+ assert "Running task 1" in r.text
+ assert "Running task 2" in r.text
+
+
+def test_run_emits_result_on_success(client, config_dir, tmp_path):
+ """Mock subprocess exit 0 + write fake summary.json — stream emits result event."""
+ bench_script = tmp_path / "fake_benchmark.py"
+ bench_script.write_text("# fake", encoding="utf-8")
+
+ tasks_file = tmp_path / "bench_tasks.yaml"
+ tasks_file.write_text(yaml.dump({"tasks": []}), encoding="utf-8")
+ models_file = tmp_path / "bench_models.yaml"
+ models_file.write_text(yaml.dump({"models": []}), encoding="utf-8")
+
+ results_dir = tmp_path / "results"
+ run_dir = results_dir / "2026-04-08-120000"
+ run_dir.mkdir(parents=True)
+ summary_data = {"score": 0.92, "models_evaluated": 3}
+ (run_dir / "summary.json").write_text(json.dumps(summary_data), encoding="utf-8")
+
+ _write_config(config_dir, {
+ "bench_script": str(bench_script),
+ "bench_tasks": str(tasks_file),
+ "bench_models": str(models_file),
+ "results_dir": str(results_dir),
+ "python_bin": "/usr/bin/python3",
+ })
+
+ mock_proc = MagicMock()
+ mock_proc.stdout = iter([])
+ mock_proc.returncode = 0
+ mock_proc.wait = MagicMock()
+
+ with patch("app.cforch._subprocess.Popen", return_value=mock_proc):
+ r = client.get("/api/cforch/run")
+
+ assert r.status_code == 200
+ assert '"type": "result"' in r.text
+ assert '"score": 0.92' in r.text
+ assert '"type": "complete"' in r.text
+
+
+# ── GET /results ───────────────────────────────────────────────────────────────
+
+def test_results_returns_404_when_no_results(client):
+ """No results_dir configured — endpoint returns 404."""
+ r = client.get("/api/cforch/results")
+ assert r.status_code == 404
+
+
+def test_results_returns_latest_summary(client, config_dir, tmp_path):
+ """Write fake results dir with one subdir containing summary.json."""
+ results_dir = tmp_path / "results"
+ run_dir = results_dir / "2026-04-08-150000"
+ run_dir.mkdir(parents=True)
+ summary_data = {"score": 0.88, "run": "test"}
+ (run_dir / "summary.json").write_text(json.dumps(summary_data), encoding="utf-8")
+
+ _write_config(config_dir, {"results_dir": str(results_dir)})
+
+ r = client.get("/api/cforch/results")
+ assert r.status_code == 200
+ data = r.json()
+ assert data["score"] == 0.88
+ assert data["run"] == "test"
+
+
+# ── POST /cancel ───────────────────────────────────────────────────────────────
+
+def test_cancel_returns_404_when_not_running(client):
+ """POST /cancel when no benchmark running — returns 404."""
+ r = client.post("/api/cforch/cancel")
+ assert r.status_code == 404
+
+
+def test_cancel_terminates_running_benchmark(client):
+ """POST /cancel when benchmark is running — terminates proc and returns cancelled."""
+ from app import cforch as cforch_module
+
+ mock_proc = MagicMock()
+ cforch_module._BENCH_RUNNING = True
+ cforch_module._bench_proc = mock_proc
+
+ r = client.post("/api/cforch/cancel")
+ assert r.status_code == 200
+ assert r.json() == {"status": "cancelled"}
+ mock_proc.terminate.assert_called_once()
+ assert cforch_module._BENCH_RUNNING is False
+ assert cforch_module._bench_proc is None
diff --git a/web/src/views/BenchmarkView.vue b/web/src/views/BenchmarkView.vue
index 8497181..81d8302 100644
--- a/web/src/views/BenchmarkView.vue
+++ b/web/src/views/BenchmarkView.vue
@@ -3,26 +3,219 @@
🏁 Benchmark
{{ llmError }}
+| Model | +overall | +{{ col }} | +tok/s | +
|---|---|---|---|
| {{ row.model_name }} | +{{ pct(row.avg_quality_score) }} | +{{ row.quality_by_task_type[col] != null ? pct(row.quality_by_task_type[col]) : '—' }} | +{{ row.avg_tokens_per_sec.toFixed(1) }} | +
Run LLM Eval on the Benchmark tab to refresh. Green = best per column.
+ + + + + +Highlighted cells are the best-scoring model per metric.
+ + +| Model | +overall | +{{ col }} | +tok/s | +
|---|---|---|---|
| {{ row.model_name }} | +{{ llmPct(row.avg_quality_score) }} | +{{ row.quality_by_task_type[col] != null ? llmPct(row.quality_by_task_type[col]) : '—' }} | +{{ row.avg_tokens_per_sec.toFixed(1) }} | +
Run LLM Eval on the Benchmark tab to refresh. Highlighted = best per column.
+ +data/email_score.jsonl
{{ fileSizeLabel }}
@@ -94,6 +132,18 @@ interface BenchmarkModelResult {
[key: string]: number | undefined
}
+interface LlmModelResult {
+ model_name: string
+ model_id: string
+ node_id: string
+ avg_tokens_per_sec: number
+ avg_completion_ms: number
+ avg_quality_score: number
+ finetune_candidates: number
+ error_count: number
+ quality_by_task_type: Record