avocet/tests/test_cforch.py
pyr0ball dffb1d0d7a feat: cf-orch LLM benchmark integration (Phase 1)
Backend (app/cforch.py — new APIRouter at /api/cforch):
- GET /tasks — reads bench_tasks.yaml, returns tasks + deduplicated types
- GET /models — reads bench_models.yaml, returns model list with service/tags
- GET /run — SSE endpoint; spawns cf-orch benchmark.py subprocess with
  --filter-tasks, --filter-tags, --coordinator, --ollama-url; strips ANSI
  codes; emits progress/result/complete/error events; 409 guard on concurrency
- GET /results — returns latest bench_results/*/summary.json; 404 if none
- POST /cancel — terminates running benchmark subprocess
- All paths configurable via label_tool.yaml cforch: section
- 13 tests; follows sft.py/models.py testability seam pattern

Frontend:
- BenchmarkView: mode toggle (Classifier / LLM Eval); LLM Eval panel with
  task picker (by type, select-all + indeterminate), model picker (by service),
  SSE run log, results table with best-per-column highlighting
- StatsView: LLM Benchmark section showing quality_by_task_type table across
  models; hidden when no results; fetches /api/cforch/results on mount

SFT candidate pipeline: cf-orch runs that produce sft_candidates.jsonl are
auto-discovered by the existing bench_results_dir config in sft.py — no
additional wiring needed.
2026-04-09 10:46:06 -07:00

282 lines
10 KiB
Python

"""Tests for app/cforch.py — /api/cforch/* endpoints."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import yaml
from fastapi.testclient import TestClient
# ── Fixtures ───────────────────────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def reset_cforch_globals(tmp_path):
"""Redirect _CONFIG_DIR to tmp_path and reset running-state globals."""
from app import cforch as cforch_module
prev_config_dir = cforch_module._CONFIG_DIR
prev_running = cforch_module._BENCH_RUNNING
prev_proc = cforch_module._bench_proc
cforch_module.set_config_dir(tmp_path)
cforch_module._BENCH_RUNNING = False
cforch_module._bench_proc = None
yield tmp_path
cforch_module.set_config_dir(prev_config_dir)
cforch_module._BENCH_RUNNING = prev_running
cforch_module._bench_proc = prev_proc
@pytest.fixture
def client():
from app.api import app
return TestClient(app)
@pytest.fixture
def config_dir(reset_cforch_globals):
"""Return the tmp config dir (already set as _CONFIG_DIR)."""
return reset_cforch_globals
def _write_config(config_dir: Path, cforch_cfg: dict) -> None:
"""Write a label_tool.yaml with the given cforch block into config_dir."""
cfg = {"cforch": cforch_cfg}
(config_dir / "label_tool.yaml").write_text(
yaml.dump(cfg), encoding="utf-8"
)
def _write_tasks_yaml(path: Path, tasks: list[dict]) -> None:
path.write_text(yaml.dump({"tasks": tasks}), encoding="utf-8")
def _write_models_yaml(path: Path, models: list[dict]) -> None:
path.write_text(yaml.dump({"models": models}), encoding="utf-8")
# ── GET /tasks ─────────────────────────────────────────────────────────────────
def test_tasks_returns_empty_when_not_configured(client):
"""No config file present — endpoint returns empty lists."""
r = client.get("/api/cforch/tasks")
assert r.status_code == 200
data = r.json()
assert data == {"tasks": [], "types": []}
def test_tasks_parses_yaml(client, config_dir, tmp_path):
tasks_file = tmp_path / "bench_tasks.yaml"
_write_tasks_yaml(tasks_file, [
{"id": "t1", "name": "Task One", "type": "instruction"},
{"id": "t2", "name": "Task Two", "type": "reasoning"},
])
_write_config(config_dir, {"bench_tasks": str(tasks_file)})
r = client.get("/api/cforch/tasks")
assert r.status_code == 200
data = r.json()
assert len(data["tasks"]) == 2
assert data["tasks"][0] == {"id": "t1", "name": "Task One", "type": "instruction"}
assert data["tasks"][1] == {"id": "t2", "name": "Task Two", "type": "reasoning"}
assert "instruction" in data["types"]
assert "reasoning" in data["types"]
def test_tasks_returns_types_deduplicated(client, config_dir, tmp_path):
"""Multiple tasks sharing a type — types list must not duplicate."""
tasks_file = tmp_path / "bench_tasks.yaml"
_write_tasks_yaml(tasks_file, [
{"id": "t1", "name": "A", "type": "instruction"},
{"id": "t2", "name": "B", "type": "instruction"},
{"id": "t3", "name": "C", "type": "reasoning"},
])
_write_config(config_dir, {"bench_tasks": str(tasks_file)})
r = client.get("/api/cforch/tasks")
data = r.json()
assert data["types"].count("instruction") == 1
assert len(data["types"]) == 2
# ── GET /models ────────────────────────────────────────────────────────────────
def test_models_returns_empty_when_not_configured(client):
"""No config file present — endpoint returns empty model list."""
r = client.get("/api/cforch/models")
assert r.status_code == 200
assert r.json() == {"models": []}
def test_models_parses_bench_models_yaml(client, config_dir, tmp_path):
models_file = tmp_path / "bench_models.yaml"
_write_models_yaml(models_file, [
{
"name": "llama3",
"id": "llama3:8b",
"service": "ollama",
"tags": ["fast", "small"],
"vram_estimate_mb": 6000,
}
])
_write_config(config_dir, {"bench_models": str(models_file)})
r = client.get("/api/cforch/models")
assert r.status_code == 200
data = r.json()
assert len(data["models"]) == 1
m = data["models"][0]
assert m["name"] == "llama3"
assert m["id"] == "llama3:8b"
assert m["service"] == "ollama"
assert m["tags"] == ["fast", "small"]
assert m["vram_estimate_mb"] == 6000
# ── GET /run ───────────────────────────────────────────────────────────────────
def test_run_returns_409_when_already_running(client):
"""If _BENCH_RUNNING is True, GET /run returns 409."""
from app import cforch as cforch_module
cforch_module._BENCH_RUNNING = True
r = client.get("/api/cforch/run")
assert r.status_code == 409
def test_run_returns_error_when_bench_script_not_configured(client):
"""No config at all — SSE stream contains an error event."""
r = client.get("/api/cforch/run")
assert r.status_code == 200
assert '"type": "error"' in r.text
assert "bench_script not configured" in r.text
def test_run_streams_progress_events(client, config_dir, tmp_path):
"""Mock subprocess — SSE stream emits progress events from stdout."""
bench_script = tmp_path / "fake_benchmark.py"
bench_script.write_text("# fake", encoding="utf-8")
tasks_file = tmp_path / "bench_tasks.yaml"
tasks_file.write_text(yaml.dump({"tasks": []}), encoding="utf-8")
models_file = tmp_path / "bench_models.yaml"
models_file.write_text(yaml.dump({"models": []}), encoding="utf-8")
results_dir = tmp_path / "results"
results_dir.mkdir()
_write_config(config_dir, {
"bench_script": str(bench_script),
"bench_tasks": str(tasks_file),
"bench_models": str(models_file),
"results_dir": str(results_dir),
"python_bin": "/usr/bin/python3",
})
mock_proc = MagicMock()
mock_proc.stdout = iter(["Running task 1\n", "Running task 2\n"])
mock_proc.returncode = 1 # non-zero so we don't need summary.json
def mock_wait():
pass
mock_proc.wait = mock_wait
with patch("app.cforch._subprocess.Popen", return_value=mock_proc):
r = client.get("/api/cforch/run")
assert r.status_code == 200
assert '"type": "progress"' in r.text
assert "Running task 1" in r.text
assert "Running task 2" in r.text
def test_run_emits_result_on_success(client, config_dir, tmp_path):
"""Mock subprocess exit 0 + write fake summary.json — stream emits result event."""
bench_script = tmp_path / "fake_benchmark.py"
bench_script.write_text("# fake", encoding="utf-8")
tasks_file = tmp_path / "bench_tasks.yaml"
tasks_file.write_text(yaml.dump({"tasks": []}), encoding="utf-8")
models_file = tmp_path / "bench_models.yaml"
models_file.write_text(yaml.dump({"models": []}), encoding="utf-8")
results_dir = tmp_path / "results"
run_dir = results_dir / "2026-04-08-120000"
run_dir.mkdir(parents=True)
summary_data = {"score": 0.92, "models_evaluated": 3}
(run_dir / "summary.json").write_text(json.dumps(summary_data), encoding="utf-8")
_write_config(config_dir, {
"bench_script": str(bench_script),
"bench_tasks": str(tasks_file),
"bench_models": str(models_file),
"results_dir": str(results_dir),
"python_bin": "/usr/bin/python3",
})
mock_proc = MagicMock()
mock_proc.stdout = iter([])
mock_proc.returncode = 0
mock_proc.wait = MagicMock()
with patch("app.cforch._subprocess.Popen", return_value=mock_proc):
r = client.get("/api/cforch/run")
assert r.status_code == 200
assert '"type": "result"' in r.text
assert '"score": 0.92' in r.text
assert '"type": "complete"' in r.text
# ── GET /results ───────────────────────────────────────────────────────────────
def test_results_returns_404_when_no_results(client):
"""No results_dir configured — endpoint returns 404."""
r = client.get("/api/cforch/results")
assert r.status_code == 404
def test_results_returns_latest_summary(client, config_dir, tmp_path):
"""Write fake results dir with one subdir containing summary.json."""
results_dir = tmp_path / "results"
run_dir = results_dir / "2026-04-08-150000"
run_dir.mkdir(parents=True)
summary_data = {"score": 0.88, "run": "test"}
(run_dir / "summary.json").write_text(json.dumps(summary_data), encoding="utf-8")
_write_config(config_dir, {"results_dir": str(results_dir)})
r = client.get("/api/cforch/results")
assert r.status_code == 200
data = r.json()
assert data["score"] == 0.88
assert data["run"] == "test"
# ── POST /cancel ───────────────────────────────────────────────────────────────
def test_cancel_returns_404_when_not_running(client):
"""POST /cancel when no benchmark running — returns 404."""
r = client.post("/api/cforch/cancel")
assert r.status_code == 404
def test_cancel_terminates_running_benchmark(client):
"""POST /cancel when benchmark is running — terminates proc and returns cancelled."""
from app import cforch as cforch_module
mock_proc = MagicMock()
cforch_module._BENCH_RUNNING = True
cforch_module._bench_proc = mock_proc
r = client.post("/api/cforch/cancel")
assert r.status_code == 200
assert r.json() == {"status": "cancelled"}
mock_proc.terminate.assert_called_once()
assert cforch_module._BENCH_RUNNING is False
assert cforch_module._bench_proc is None