diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..03c3c9f --- /dev/null +++ b/.env.example @@ -0,0 +1,19 @@ +# Avocet — environment variable configuration +# Copy to .env and fill in values. All keys are optional. +# label_tool.yaml takes precedence over env vars where both exist. + +# ── Local inference (Ollama) ─────────────────────────────────────────────────── +# OLLAMA_HOST defaults to http://localhost:11434 if unset. +OLLAMA_HOST=http://localhost:11434 +OLLAMA_MODEL=llama3.2:3b + +# ── cf-orch coordinator (paid/premium tiers) ─────────────────────────────────── +# Required for multi-GPU LLM benchmarking via the cf-orch benchmark harness. +# Free-tier users can leave these unset and use Ollama only. +CF_ORCH_URL=http://localhost:7700 +CF_LICENSE_KEY=CFG-AVCT-xxxx-xxxx-xxxx + +# ── Cloud LLM backends (optional — paid/premium) ────────────────────────────── +# Set one of these to use a cloud LLM instead of a local model. +# ANTHROPIC_API_KEY=sk-ant-... +# OPENAI_API_KEY=sk-... diff --git a/app/api.py b/app/api.py index 0788e13..e1babf5 100644 --- a/app/api.py +++ b/app/api.py @@ -149,6 +149,9 @@ from app.models import router as models_router import app.models as _models_module app.include_router(models_router, prefix="/api/models") +from app.cforch import router as cforch_router +app.include_router(cforch_router, prefix="/api/cforch") + # In-memory last-action store (single user, local tool — in-memory is fine) _last_action: dict | None = None diff --git a/app/cforch.py b/app/cforch.py new file mode 100644 index 0000000..5942ab3 --- /dev/null +++ b/app/cforch.py @@ -0,0 +1,335 @@ +"""Avocet — cf-orch benchmark integration API. + +Wraps cf-orch's benchmark.py script and exposes it via the Avocet API. +Config is read from label_tool.yaml under the `cforch:` key. + +All endpoints are registered on `router` (a FastAPI APIRouter). +api.py includes this router with prefix="/api/cforch". + +Module-level globals (_CONFIG_DIR, _BENCH_RUNNING, _bench_proc) follow the +same testability pattern as sft.py — override _CONFIG_DIR via set_config_dir() +in test fixtures. +""" +from __future__ import annotations + +import json +import logging +import os +import re +import subprocess as _subprocess +from pathlib import Path +from typing import Any + +import yaml +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse + +logger = logging.getLogger(__name__) + +_ROOT = Path(__file__).parent.parent +_CONFIG_DIR: Path | None = None # override in tests +_BENCH_RUNNING: bool = False +_bench_proc: Any = None # live Popen object while benchmark runs + +router = APIRouter() + + +# ── Testability seams ────────────────────────────────────────────────────────── + +def set_config_dir(path: Path | None) -> None: + global _CONFIG_DIR + _CONFIG_DIR = path + + +# ── Internal helpers ─────────────────────────────────────────────────────────── + +def _config_file() -> Path: + if _CONFIG_DIR is not None: + return _CONFIG_DIR / "label_tool.yaml" + return _ROOT / "config" / "label_tool.yaml" + + +def _load_cforch_config() -> dict: + """Read label_tool.yaml cforch section, falling back to environment variables. + + Priority (highest to lowest): + 1. label_tool.yaml cforch: key + 2. Environment variables (CF_ORCH_URL, CF_LICENSE_KEY, OLLAMA_HOST, OLLAMA_MODEL) + """ + f = _config_file() + file_cfg: dict = {} + if f.exists(): + try: + raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} + file_cfg = raw.get("cforch", {}) or {} + except yaml.YAMLError as exc: + logger.warning("Failed to parse cforch config %s: %s", f, exc) + + # Env var fallbacks — only used when the yaml key is absent or empty + def _coalesce(file_val: str, env_key: str) -> str: + return file_val if file_val else os.environ.get(env_key, "") + + return { + **file_cfg, + "coordinator_url": _coalesce(file_cfg.get("coordinator_url", ""), "CF_ORCH_URL"), + "license_key": _coalesce(file_cfg.get("license_key", ""), "CF_LICENSE_KEY"), + "ollama_url": _coalesce(file_cfg.get("ollama_url", ""), "OLLAMA_HOST"), + "ollama_model": _coalesce(file_cfg.get("ollama_model", ""), "OLLAMA_MODEL"), + } + + +def _strip_ansi(text: str) -> str: + """Remove ANSI escape codes from a string.""" + return re.sub(r'\x1b\[[0-9;]*m', '', text) + + +def _find_latest_summary(results_dir: str | None) -> Path | None: + """Find the newest summary.json under results_dir, or None if not found.""" + if not results_dir: + return None + rdir = Path(results_dir) + if not rdir.exists(): + return None + # Subdirs are named YYYY-MM-DD-HHMMSS; sort lexicographically for chronological order + subdirs = sorted( + [d for d in rdir.iterdir() if d.is_dir()], + key=lambda d: d.name, + ) + for subdir in reversed(subdirs): + summary = subdir / "summary.json" + if summary.exists(): + return summary + return None + + +# ── GET /tasks ───────────────────────────────────────────────────────────────── + +@router.get("/tasks") +def get_tasks() -> dict: + """Return task list from bench_tasks.yaml.""" + cfg = _load_cforch_config() + tasks_path = cfg.get("bench_tasks", "") + if not tasks_path: + return {"tasks": [], "types": []} + + p = Path(tasks_path) + if not p.exists(): + return {"tasks": [], "types": []} + + try: + raw = yaml.safe_load(p.read_text(encoding="utf-8")) or {} + except yaml.YAMLError as exc: + logger.warning("Failed to parse bench_tasks.yaml %s: %s", p, exc) + return {"tasks": [], "types": []} + + tasks_raw = raw.get("tasks", []) or [] + tasks: list[dict] = [] + seen_types: list[str] = [] + types_set: set[str] = set() + + for t in tasks_raw: + if not isinstance(t, dict): + continue + tasks.append({ + "id": t.get("id", ""), + "name": t.get("name", ""), + "type": t.get("type", ""), + }) + task_type = t.get("type", "") + if task_type and task_type not in types_set: + seen_types.append(task_type) + types_set.add(task_type) + + return {"tasks": tasks, "types": seen_types} + + +# ── GET /models ──────────────────────────────────────────────────────────────── + +@router.get("/models") +def get_models() -> dict: + """Return model list from bench_models.yaml.""" + cfg = _load_cforch_config() + models_path = cfg.get("bench_models", "") + if not models_path: + return {"models": []} + + p = Path(models_path) + if not p.exists(): + return {"models": []} + + try: + raw = yaml.safe_load(p.read_text(encoding="utf-8")) or {} + except yaml.YAMLError as exc: + logger.warning("Failed to parse bench_models.yaml %s: %s", p, exc) + return {"models": []} + + models_raw = raw.get("models", []) or [] + models: list[dict] = [] + for m in models_raw: + if not isinstance(m, dict): + continue + models.append({ + "name": m.get("name", ""), + "id": m.get("id", ""), + "service": m.get("service", "ollama"), + "tags": m.get("tags", []) or [], + "vram_estimate_mb": m.get("vram_estimate_mb", 0), + }) + + return {"models": models} + + +# ── GET /run ─────────────────────────────────────────────────────────────────── + +@router.get("/run") +def run_benchmark( + task_ids: str = "", + model_tags: str = "", + coordinator_url: str = "", + ollama_url: str = "", +) -> StreamingResponse: + """Spawn cf-orch benchmark.py and stream stdout as SSE progress events.""" + global _BENCH_RUNNING, _bench_proc + + if _BENCH_RUNNING: + raise HTTPException(409, "A benchmark is already running") + + cfg = _load_cforch_config() + bench_script = cfg.get("bench_script", "") + bench_tasks = cfg.get("bench_tasks", "") + bench_models = cfg.get("bench_models", "") + results_dir = cfg.get("results_dir", "") + python_bin = cfg.get("python_bin", "/devl/miniconda3/envs/cf/bin/python") + cfg_coordinator = cfg.get("coordinator_url", "") + cfg_ollama = cfg.get("ollama_url", "") + cfg_license_key = cfg.get("license_key", "") + + def generate(): + global _BENCH_RUNNING, _bench_proc + + if not bench_script or not Path(bench_script).exists(): + yield f"data: {json.dumps({'type': 'error', 'message': 'bench_script not configured or not found'})}\n\n" + return + + cmd = [ + python_bin, + bench_script, + "--tasks", bench_tasks, + "--models", bench_models, + "--output", results_dir, + ] + + if task_ids: + cmd.extend(["--filter-tasks"] + task_ids.split(",")) + if model_tags: + cmd.extend(["--filter-tags"] + model_tags.split(",")) + + # query param overrides config, config overrides env var (already resolved by _load_cforch_config) + effective_coordinator = coordinator_url if coordinator_url else cfg_coordinator + effective_ollama = ollama_url if ollama_url else cfg_ollama + if effective_coordinator: + cmd.extend(["--coordinator", effective_coordinator]) + if effective_ollama: + cmd.extend(["--ollama-url", effective_ollama]) + + # Pass license key as env var so subprocess can authenticate with cf-orch + proc_env = {**os.environ} + if cfg_license_key: + proc_env["CF_LICENSE_KEY"] = cfg_license_key + + _BENCH_RUNNING = True + try: + proc = _subprocess.Popen( + cmd, + stdout=_subprocess.PIPE, + stderr=_subprocess.STDOUT, + text=True, + bufsize=1, + env=proc_env, + ) + _bench_proc = proc + try: + for line in proc.stdout: + line = _strip_ansi(line.rstrip()) + if line: + yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n" + proc.wait() + if proc.returncode == 0: + summary_path = _find_latest_summary(results_dir) + if summary_path is not None: + try: + summary = json.loads(summary_path.read_text(encoding="utf-8")) + yield f"data: {json.dumps({'type': 'result', 'summary': summary})}\n\n" + except Exception as exc: + logger.warning("Failed to read summary.json: %s", exc) + yield f"data: {json.dumps({'type': 'complete'})}\n\n" + else: + yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n" + finally: + _bench_proc = None + except Exception as exc: + yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n" + finally: + _BENCH_RUNNING = False + + return StreamingResponse( + generate(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + + +# ── GET /config ──────────────────────────────────────────────────────────────── + +@router.get("/config") +def get_cforch_config() -> dict: + """Return resolved cf-orch connection config (env vars merged with yaml). + + Redacts license_key — only returns whether it is set, not the value. + Used by the Settings UI to show current connection state. + """ + cfg = _load_cforch_config() + return { + "coordinator_url": cfg.get("coordinator_url", ""), + "ollama_url": cfg.get("ollama_url", ""), + "ollama_model": cfg.get("ollama_model", ""), + "license_key_set": bool(cfg.get("license_key", "")), + "source": "env" if not _config_file().exists() else "yaml+env", + } + + +# ── GET /results ─────────────────────────────────────────────────────────────── + +@router.get("/results") +def get_results() -> dict: + """Return the latest benchmark summary.json from results_dir.""" + cfg = _load_cforch_config() + results_dir = cfg.get("results_dir", "") + summary_path = _find_latest_summary(results_dir) + if summary_path is None: + raise HTTPException(404, "No benchmark results found") + try: + return json.loads(summary_path.read_text(encoding="utf-8")) + except Exception as exc: + raise HTTPException(500, f"Failed to read summary.json: {exc}") from exc + + +# ── POST /cancel ─────────────────────────────────────────────────────────────── + +@router.post("/cancel") +def cancel_benchmark() -> dict: + """Kill the running benchmark subprocess.""" + global _BENCH_RUNNING, _bench_proc + + if not _BENCH_RUNNING: + raise HTTPException(404, "No benchmark is currently running") + + if _bench_proc is not None: + try: + _bench_proc.terminate() + except Exception as exc: + logger.warning("Failed to terminate benchmark process: %s", exc) + + _BENCH_RUNNING = False + _bench_proc = None + return {"status": "cancelled"} diff --git a/app/sft.py b/app/sft.py index ab439f1..b9b9fe0 100644 --- a/app/sft.py +++ b/app/sft.py @@ -51,17 +51,26 @@ def _config_file() -> Path: return _ROOT / "config" / "label_tool.yaml" +_DEFAULT_BENCH_RESULTS_DIR = "/Library/Development/CircuitForge/circuitforge-orch/scripts/bench_results" + + +def set_default_bench_results_dir(path: str) -> None: + """Override the default bench_results_dir — used by tests to avoid real filesystem.""" + global _DEFAULT_BENCH_RESULTS_DIR + _DEFAULT_BENCH_RESULTS_DIR = path + + def _get_bench_results_dir() -> Path: f = _config_file() - if not f.exists(): - return Path("/nonexistent-bench-results") - try: - raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} - except yaml.YAMLError as exc: - logger.warning("Failed to parse SFT config %s: %s", f, exc) - return Path("/nonexistent-bench-results") - d = raw.get("sft", {}).get("bench_results_dir", "") - return Path(d) if d else Path("/nonexistent-bench-results") + if f.exists(): + try: + raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} + d = raw.get("sft", {}).get("bench_results_dir", "") + if d: + return Path(d) + except yaml.YAMLError as exc: + logger.warning("Failed to parse SFT config %s: %s", f, exc) + return Path(_DEFAULT_BENCH_RESULTS_DIR) def _candidates_file() -> Path: diff --git a/config/label_tool.yaml.example b/config/label_tool.yaml.example index 9310d21..8aedb2b 100644 --- a/config/label_tool.yaml.example +++ b/config/label_tool.yaml.example @@ -26,3 +26,23 @@ max_per_account: 500 # produced by circuitforge-orch's benchmark harness. sft: bench_results_dir: /path/to/circuitforge-orch/scripts/bench_results + +# cf-orch integration — LLM benchmark harness via cf-orch coordinator. +# All keys here override the corresponding environment variables. +# Omit any key to fall back to the env var (see .env.example). +cforch: + # Path to cf-orch's benchmark.py script + bench_script: /path/to/circuitforge-orch/scripts/benchmark.py + # Task and model definition files (yaml) + bench_tasks: /path/to/circuitforge-orch/scripts/bench_tasks.yaml + bench_models: /path/to/circuitforge-orch/scripts/bench_models.yaml + # Where benchmark results are written (also used for SFT candidate discovery) + results_dir: /path/to/circuitforge-orch/scripts/bench_results + # Python interpreter with cf-orch installed + python_bin: /devl/miniconda3/envs/cf/bin/python + + # Connection config — override env vars CF_ORCH_URL / CF_LICENSE_KEY / OLLAMA_HOST + # coordinator_url: http://localhost:7700 + # license_key: CFG-AVCT-xxxx-xxxx-xxxx + # ollama_url: http://localhost:11434 + # ollama_model: llama3.2:3b diff --git a/environment.yml b/environment.yml index 73f1941..e8553f3 100644 --- a/environment.yml +++ b/environment.yml @@ -22,5 +22,8 @@ dependencies: # Optional: BGE reranker adapter # - FlagEmbedding + # CircuitForge shared core (LLM router, tier system, config) + - circuitforge-core>=0.9.0 + # Dev - pytest>=8.0 diff --git a/tests/test_cforch.py b/tests/test_cforch.py new file mode 100644 index 0000000..3abc8ea --- /dev/null +++ b/tests/test_cforch.py @@ -0,0 +1,366 @@ +"""Tests for app/cforch.py — /api/cforch/* endpoints.""" +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +import yaml +from fastapi.testclient import TestClient + + +# ── Fixtures ─────────────────────────────────────────────────────────────────── + +@pytest.fixture(autouse=True) +def reset_cforch_globals(tmp_path): + """Redirect _CONFIG_DIR to tmp_path and reset running-state globals.""" + from app import cforch as cforch_module + + prev_config_dir = cforch_module._CONFIG_DIR + prev_running = cforch_module._BENCH_RUNNING + prev_proc = cforch_module._bench_proc + + cforch_module.set_config_dir(tmp_path) + cforch_module._BENCH_RUNNING = False + cforch_module._bench_proc = None + + yield tmp_path + + cforch_module.set_config_dir(prev_config_dir) + cforch_module._BENCH_RUNNING = prev_running + cforch_module._bench_proc = prev_proc + + +@pytest.fixture +def client(): + from app.api import app + return TestClient(app) + + +@pytest.fixture +def config_dir(reset_cforch_globals): + """Return the tmp config dir (already set as _CONFIG_DIR).""" + return reset_cforch_globals + + +def _write_config(config_dir: Path, cforch_cfg: dict) -> None: + """Write a label_tool.yaml with the given cforch block into config_dir.""" + cfg = {"cforch": cforch_cfg} + (config_dir / "label_tool.yaml").write_text( + yaml.dump(cfg), encoding="utf-8" + ) + + +def _write_tasks_yaml(path: Path, tasks: list[dict]) -> None: + path.write_text(yaml.dump({"tasks": tasks}), encoding="utf-8") + + +def _write_models_yaml(path: Path, models: list[dict]) -> None: + path.write_text(yaml.dump({"models": models}), encoding="utf-8") + + +# ── GET /tasks ───────────────────────────────────────────────────────────────── + +def test_tasks_returns_empty_when_not_configured(client): + """No config file present — endpoint returns empty lists.""" + r = client.get("/api/cforch/tasks") + assert r.status_code == 200 + data = r.json() + assert data == {"tasks": [], "types": []} + + +def test_tasks_parses_yaml(client, config_dir, tmp_path): + tasks_file = tmp_path / "bench_tasks.yaml" + _write_tasks_yaml(tasks_file, [ + {"id": "t1", "name": "Task One", "type": "instruction"}, + {"id": "t2", "name": "Task Two", "type": "reasoning"}, + ]) + _write_config(config_dir, {"bench_tasks": str(tasks_file)}) + + r = client.get("/api/cforch/tasks") + assert r.status_code == 200 + data = r.json() + assert len(data["tasks"]) == 2 + assert data["tasks"][0] == {"id": "t1", "name": "Task One", "type": "instruction"} + assert data["tasks"][1] == {"id": "t2", "name": "Task Two", "type": "reasoning"} + assert "instruction" in data["types"] + assert "reasoning" in data["types"] + + +def test_tasks_returns_types_deduplicated(client, config_dir, tmp_path): + """Multiple tasks sharing a type — types list must not duplicate.""" + tasks_file = tmp_path / "bench_tasks.yaml" + _write_tasks_yaml(tasks_file, [ + {"id": "t1", "name": "A", "type": "instruction"}, + {"id": "t2", "name": "B", "type": "instruction"}, + {"id": "t3", "name": "C", "type": "reasoning"}, + ]) + _write_config(config_dir, {"bench_tasks": str(tasks_file)}) + + r = client.get("/api/cforch/tasks") + data = r.json() + assert data["types"].count("instruction") == 1 + assert len(data["types"]) == 2 + + +# ── GET /models ──────────────────────────────────────────────────────────────── + +def test_models_returns_empty_when_not_configured(client): + """No config file present — endpoint returns empty model list.""" + r = client.get("/api/cforch/models") + assert r.status_code == 200 + assert r.json() == {"models": []} + + +def test_models_parses_bench_models_yaml(client, config_dir, tmp_path): + models_file = tmp_path / "bench_models.yaml" + _write_models_yaml(models_file, [ + { + "name": "llama3", + "id": "llama3:8b", + "service": "ollama", + "tags": ["fast", "small"], + "vram_estimate_mb": 6000, + } + ]) + _write_config(config_dir, {"bench_models": str(models_file)}) + + r = client.get("/api/cforch/models") + assert r.status_code == 200 + data = r.json() + assert len(data["models"]) == 1 + m = data["models"][0] + assert m["name"] == "llama3" + assert m["id"] == "llama3:8b" + assert m["service"] == "ollama" + assert m["tags"] == ["fast", "small"] + assert m["vram_estimate_mb"] == 6000 + + +# ── GET /run ─────────────────────────────────────────────────────────────────── + +def test_run_returns_409_when_already_running(client): + """If _BENCH_RUNNING is True, GET /run returns 409.""" + from app import cforch as cforch_module + cforch_module._BENCH_RUNNING = True + + r = client.get("/api/cforch/run") + assert r.status_code == 409 + + +def test_run_returns_error_when_bench_script_not_configured(client): + """No config at all — SSE stream contains an error event.""" + r = client.get("/api/cforch/run") + assert r.status_code == 200 + assert '"type": "error"' in r.text + assert "bench_script not configured" in r.text + + +def test_run_streams_progress_events(client, config_dir, tmp_path): + """Mock subprocess — SSE stream emits progress events from stdout.""" + bench_script = tmp_path / "fake_benchmark.py" + bench_script.write_text("# fake", encoding="utf-8") + + tasks_file = tmp_path / "bench_tasks.yaml" + tasks_file.write_text(yaml.dump({"tasks": []}), encoding="utf-8") + models_file = tmp_path / "bench_models.yaml" + models_file.write_text(yaml.dump({"models": []}), encoding="utf-8") + results_dir = tmp_path / "results" + results_dir.mkdir() + + _write_config(config_dir, { + "bench_script": str(bench_script), + "bench_tasks": str(tasks_file), + "bench_models": str(models_file), + "results_dir": str(results_dir), + "python_bin": "/usr/bin/python3", + }) + + mock_proc = MagicMock() + mock_proc.stdout = iter(["Running task 1\n", "Running task 2\n"]) + mock_proc.returncode = 1 # non-zero so we don't need summary.json + + def mock_wait(): + pass + + mock_proc.wait = mock_wait + + with patch("app.cforch._subprocess.Popen", return_value=mock_proc): + r = client.get("/api/cforch/run") + + assert r.status_code == 200 + assert '"type": "progress"' in r.text + assert "Running task 1" in r.text + assert "Running task 2" in r.text + + +def test_run_emits_result_on_success(client, config_dir, tmp_path): + """Mock subprocess exit 0 + write fake summary.json — stream emits result event.""" + bench_script = tmp_path / "fake_benchmark.py" + bench_script.write_text("# fake", encoding="utf-8") + + tasks_file = tmp_path / "bench_tasks.yaml" + tasks_file.write_text(yaml.dump({"tasks": []}), encoding="utf-8") + models_file = tmp_path / "bench_models.yaml" + models_file.write_text(yaml.dump({"models": []}), encoding="utf-8") + + results_dir = tmp_path / "results" + run_dir = results_dir / "2026-04-08-120000" + run_dir.mkdir(parents=True) + summary_data = {"score": 0.92, "models_evaluated": 3} + (run_dir / "summary.json").write_text(json.dumps(summary_data), encoding="utf-8") + + _write_config(config_dir, { + "bench_script": str(bench_script), + "bench_tasks": str(tasks_file), + "bench_models": str(models_file), + "results_dir": str(results_dir), + "python_bin": "/usr/bin/python3", + }) + + mock_proc = MagicMock() + mock_proc.stdout = iter([]) + mock_proc.returncode = 0 + mock_proc.wait = MagicMock() + + with patch("app.cforch._subprocess.Popen", return_value=mock_proc): + r = client.get("/api/cforch/run") + + assert r.status_code == 200 + assert '"type": "result"' in r.text + assert '"score": 0.92' in r.text + assert '"type": "complete"' in r.text + + +# ── GET /results ─────────────────────────────────────────────────────────────── + +def test_results_returns_404_when_no_results(client): + """No results_dir configured — endpoint returns 404.""" + r = client.get("/api/cforch/results") + assert r.status_code == 404 + + +def test_results_returns_latest_summary(client, config_dir, tmp_path): + """Write fake results dir with one subdir containing summary.json.""" + results_dir = tmp_path / "results" + run_dir = results_dir / "2026-04-08-150000" + run_dir.mkdir(parents=True) + summary_data = {"score": 0.88, "run": "test"} + (run_dir / "summary.json").write_text(json.dumps(summary_data), encoding="utf-8") + + _write_config(config_dir, {"results_dir": str(results_dir)}) + + r = client.get("/api/cforch/results") + assert r.status_code == 200 + data = r.json() + assert data["score"] == 0.88 + assert data["run"] == "test" + + +# ── POST /cancel ─────────────────────────────────────────────────────────────── + +def test_cancel_returns_404_when_not_running(client): + """POST /cancel when no benchmark running — returns 404.""" + r = client.post("/api/cforch/cancel") + assert r.status_code == 404 + + +def test_cancel_terminates_running_benchmark(client): + """POST /cancel when benchmark is running — terminates proc and returns cancelled.""" + from app import cforch as cforch_module + + mock_proc = MagicMock() + cforch_module._BENCH_RUNNING = True + cforch_module._bench_proc = mock_proc + + r = client.post("/api/cforch/cancel") + assert r.status_code == 200 + assert r.json() == {"status": "cancelled"} + mock_proc.terminate.assert_called_once() + assert cforch_module._BENCH_RUNNING is False + assert cforch_module._bench_proc is None + + +# ── GET /config ──────────────────────────────────────────────────────────────── + +def test_config_returns_empty_when_no_yaml_no_env(client, monkeypatch): + """No yaml, no env vars — all fields empty, license_key_set False.""" + for key in ("CF_ORCH_URL", "CF_LICENSE_KEY", "OLLAMA_HOST", "OLLAMA_MODEL"): + monkeypatch.delenv(key, raising=False) + + r = client.get("/api/cforch/config") + assert r.status_code == 200 + data = r.json() + assert data["coordinator_url"] == "" + assert data["ollama_url"] == "" + assert data["license_key_set"] is False + + +def test_config_reads_env_vars_when_no_yaml(client, monkeypatch): + """Env vars populate fields when label_tool.yaml has no cforch section.""" + monkeypatch.setenv("CF_ORCH_URL", "http://orch.example.com:7700") + monkeypatch.setenv("CF_LICENSE_KEY", "CFG-AVCT-TEST-TEST-TEST") + monkeypatch.setenv("OLLAMA_HOST", "http://ollama.local:11434") + monkeypatch.setenv("OLLAMA_MODEL", "mistral:7b") + + r = client.get("/api/cforch/config") + assert r.status_code == 200 + data = r.json() + assert data["coordinator_url"] == "http://orch.example.com:7700" + assert data["ollama_url"] == "http://ollama.local:11434" + assert data["ollama_model"] == "mistral:7b" + assert data["license_key_set"] is True # set, but value not exposed + + +def test_config_yaml_overrides_env(client, config_dir, monkeypatch): + """label_tool.yaml cforch values take priority over env vars.""" + monkeypatch.setenv("CF_ORCH_URL", "http://env-orch:7700") + monkeypatch.setenv("OLLAMA_HOST", "http://env-ollama:11434") + + _write_config(config_dir, { + "coordinator_url": "http://yaml-orch:7700", + "ollama_url": "http://yaml-ollama:11434", + }) + + r = client.get("/api/cforch/config") + assert r.status_code == 200 + data = r.json() + assert data["coordinator_url"] == "http://yaml-orch:7700" + assert data["ollama_url"] == "http://yaml-ollama:11434" + assert data["source"] == "yaml+env" + + +def test_run_passes_license_key_env_to_subprocess(client, config_dir, tmp_path, monkeypatch): + """CF_LICENSE_KEY must be forwarded to the benchmark subprocess env.""" + monkeypatch.setenv("CF_LICENSE_KEY", "CFG-AVCT-ENV-ONLY-KEY") + + bench_script = tmp_path / "benchmark.py" + bench_script.write_text("# stub", encoding="utf-8") + tasks_file = tmp_path / "bench_tasks.yaml" + tasks_file.write_text(yaml.dump({"tasks": []}), encoding="utf-8") + models_file = tmp_path / "bench_models.yaml" + models_file.write_text(yaml.dump({"models": []}), encoding="utf-8") + + _write_config(config_dir, { + "bench_script": str(bench_script), + "bench_tasks": str(tasks_file), + "bench_models": str(models_file), + "results_dir": str(tmp_path / "results"), + "python_bin": "/usr/bin/python3", + }) + + captured_env: dict = {} + + def fake_popen(cmd, **kwargs): + captured_env.update(kwargs.get("env", {})) + mock = MagicMock() + mock.stdout = iter([]) + mock.returncode = 0 + mock.wait = MagicMock() + return mock + + with patch("app.cforch._subprocess.Popen", side_effect=fake_popen): + client.get("/api/cforch/run") + + assert captured_env.get("CF_LICENSE_KEY") == "CFG-AVCT-ENV-ONLY-KEY" diff --git a/tests/test_sft.py b/tests/test_sft.py index e3c98f9..ac6410c 100644 --- a/tests/test_sft.py +++ b/tests/test_sft.py @@ -8,13 +8,16 @@ from pathlib import Path @pytest.fixture(autouse=True) def reset_sft_globals(tmp_path): from app import sft as sft_module - _prev_data = sft_module._SFT_DATA_DIR - _prev_cfg = sft_module._SFT_CONFIG_DIR + _prev_data = sft_module._SFT_DATA_DIR + _prev_cfg = sft_module._SFT_CONFIG_DIR + _prev_default = sft_module._DEFAULT_BENCH_RESULTS_DIR sft_module.set_sft_data_dir(tmp_path) sft_module.set_sft_config_dir(tmp_path) + sft_module.set_default_bench_results_dir(str(tmp_path / "bench_results")) yield sft_module.set_sft_data_dir(_prev_data) sft_module.set_sft_config_dir(_prev_cfg) + sft_module.set_default_bench_results_dir(_prev_default) @pytest.fixture diff --git a/web/src/views/BenchmarkView.vue b/web/src/views/BenchmarkView.vue index 8497181..81d8302 100644 --- a/web/src/views/BenchmarkView.vue +++ b/web/src/views/BenchmarkView.vue @@ -3,26 +3,219 @@

🏁 Benchmark

-
+
+ + +
+ + +
+ + + + + + + + @@ -278,6 +475,33 @@ interface AvailableModel { adapter_type: string } +// cf-orch types +interface CfOrchTask { + id: string + name: string + type: string +} + +interface CfOrchModel { + name: string + id: string + service: string + tags: string[] + vram_estimate_mb?: number +} + +interface LlmModelResult { + model_name: string + model_id: string + node_id: string + avg_tokens_per_sec: number + avg_completion_ms: number + avg_quality_score: number + finetune_candidates: number + error_count: number + quality_by_task_type: Record +} + interface ModelCategoriesResponse { categories: Record } @@ -329,6 +553,25 @@ const ftError = ref('') const ftLogEl = ref(null) const runCancelled = ref(false) + +// ── Mode toggle ─────────────────────────────────────────────────────────────── +const benchMode = ref<'classifier' | 'llm'>('classifier') + +// ── LLM Eval state ─────────────────────────────────────────────────────────── +const llmTasks = ref([]) +const llmTasksLoading = ref(false) +const llmModels = ref([]) +const llmModelsLoading = ref(false) + +const selectedLlmTasks = ref>(new Set()) +const selectedLlmModels = ref>(new Set()) + +const llmRunning = ref(false) +const llmRunLog = ref([]) +const llmError = ref('') +const llmResults = ref([]) +const llmEventSource = ref(null) +const llmLogEl = ref(null) const ftCancelled = ref(false) async function cancelBenchmark() { @@ -339,6 +582,197 @@ async function cancelFinetune() { await fetch('/api/finetune/cancel', { method: 'POST' }).catch(() => {}) } +// ── LLM Eval computed ───────────────────────────────────────────────────────── +const llmTasksByType = computed((): Record => { + const groups: Record = {} + for (const t of llmTasks.value) { + if (!groups[t.type]) groups[t.type] = [] + groups[t.type].push(t) + } + return groups +}) + +const llmModelsByService = computed((): Record => { + const groups: Record = {} + for (const m of llmModels.value) { + if (!groups[m.service]) groups[m.service] = [] + groups[m.service].push(m) + } + return groups +}) + +const llmTaskBadge = computed(() => { + const total = llmTasks.value.length + if (total === 0) return 'No tasks available' + const sel = selectedLlmTasks.value.size + if (sel === total) return `All tasks (${total})` + return `${sel} of ${total} tasks selected` +}) + +const llmModelBadge = computed(() => { + const total = llmModels.value.length + if (total === 0) return 'No models available' + const sel = selectedLlmModels.value.size + if (sel === total) return `All models (${total})` + return `${sel} of ${total} selected` +}) + +// All task type columns present in any result row +const llmTaskTypeCols = computed(() => { + const types = new Set() + for (const r of llmResults.value) { + for (const k of Object.keys(r.quality_by_task_type)) types.add(k) + } + return [...types].sort() +}) + +// Best model id per column (overall + each task type col) +const llmBestByCol = computed((): Record => { + const best: Record = {} + if (llmResults.value.length === 0) return best + + // overall + let bestId = '', bestVal = -Infinity + for (const r of llmResults.value) { + if (r.avg_quality_score > bestVal) { bestVal = r.avg_quality_score; bestId = r.model_id } + } + best['overall'] = bestId + + for (const col of llmTaskTypeCols.value) { + bestId = ''; bestVal = -Infinity + for (const r of llmResults.value) { + const v = r.quality_by_task_type[col] + if (v != null && v > bestVal) { bestVal = v; bestId = r.model_id } + } + best[col] = bestId + } + return best +}) + +function pct(v: number): string { + return `${(v * 100).toFixed(1)}%` +} + +// Task picker helpers +function isTaskTypeAllSelected(tasks: CfOrchTask[]): boolean { + return tasks.length > 0 && tasks.every(t => selectedLlmTasks.value.has(t.id)) +} +function isTaskTypeIndeterminate(tasks: CfOrchTask[]): boolean { + const some = tasks.some(t => selectedLlmTasks.value.has(t.id)) + return some && !isTaskTypeAllSelected(tasks) +} +function toggleLlmTask(id: string, checked: boolean) { + const next = new Set(selectedLlmTasks.value) + if (checked) next.add(id) + else next.delete(id) + selectedLlmTasks.value = next +} +function toggleTaskType(tasks: CfOrchTask[], checked: boolean) { + const next = new Set(selectedLlmTasks.value) + for (const t of tasks) { + if (checked) next.add(t.id) + else next.delete(t.id) + } + selectedLlmTasks.value = next +} + +// Model picker helpers +function isServiceAllSelected(models: CfOrchModel[]): boolean { + return models.length > 0 && models.every(m => selectedLlmModels.value.has(m.id)) +} +function isServiceIndeterminate(models: CfOrchModel[]): boolean { + const some = models.some(m => selectedLlmModels.value.has(m.id)) + return some && !isServiceAllSelected(models) +} +function toggleLlmModel(id: string, checked: boolean) { + const next = new Set(selectedLlmModels.value) + if (checked) next.add(id) + else next.delete(id) + selectedLlmModels.value = next +} +function toggleService(models: CfOrchModel[], checked: boolean) { + const next = new Set(selectedLlmModels.value) + for (const m of models) { + if (checked) next.add(m.id) + else next.delete(m.id) + } + selectedLlmModels.value = next +} + +// Data loaders +async function loadLlmTasks() { + llmTasksLoading.value = true + const { data } = await useApiFetch<{ tasks: CfOrchTask[]; types: string[] }>('/api/cforch/tasks') + llmTasksLoading.value = false + if (data?.tasks) { + llmTasks.value = data.tasks + selectedLlmTasks.value = new Set(data.tasks.map(t => t.id)) + } +} + +async function loadLlmModels() { + llmModelsLoading.value = true + const { data } = await useApiFetch<{ models: CfOrchModel[] }>('/api/cforch/models') + llmModelsLoading.value = false + if (data?.models) { + llmModels.value = data.models + selectedLlmModels.value = new Set(data.models.map(m => m.id)) + } +} + +async function loadLlmResults() { + const { data } = await useApiFetch('/api/cforch/results') + if (Array.isArray(data) && data.length > 0) { + llmResults.value = data + } +} + +async function cancelLlmBenchmark() { + llmEventSource.value?.close() + llmEventSource.value = null + llmRunning.value = false + await fetch('/api/cforch/cancel', { method: 'POST' }).catch(() => {}) +} + +function startLlmBenchmark() { + llmRunning.value = true + llmRunLog.value = [] + llmError.value = '' + + const params = new URLSearchParams() + const taskIds = [...selectedLlmTasks.value].join(',') + if (taskIds) params.set('task_ids', taskIds) + + const es = new EventSource(`/api/cforch/run?${params}`) + llmEventSource.value = es + + es.onmessage = async (e: MessageEvent) => { + const msg = JSON.parse(e.data) + if (msg.type === 'progress' && typeof msg.message === 'string') { + llmRunLog.value.push(msg.message) + await nextTick() + llmLogEl.value?.scrollTo({ top: llmLogEl.value.scrollHeight, behavior: 'smooth' }) + } else if (msg.type === 'result' && Array.isArray(msg.summary)) { + llmResults.value = msg.summary + } else if (msg.type === 'complete') { + llmRunning.value = false + es.close() + llmEventSource.value = null + } else if (msg.type === 'error' && typeof msg.message === 'string') { + llmError.value = msg.message + llmRunning.value = false + es.close() + llmEventSource.value = null + } + } + es.onerror = () => { + if (llmRunning.value) llmError.value = 'Connection lost' + llmRunning.value = false + es.close() + llmEventSource.value = null + } +} + // ── Model picker computed ───────────────────────────────────────────────────── const pickerSummaryText = computed(() => { const total = allModels.value.length @@ -548,6 +982,9 @@ onMounted(() => { loadResults() loadFineTunedModels() loadModelCategories() + loadLlmTasks() + loadLlmModels() + loadLlmResults() }) @@ -1092,4 +1529,78 @@ details[open] .ft-summary::before { content: '▼ '; } .ft-controls { flex-direction: column; align-items: stretch; } .ft-select { min-width: 0; width: 100%; } } + +/* ── Mode toggle (segmented control / pill) ─────── */ +.mode-toggle { + display: inline-flex; + border: 1px solid var(--color-border, #d0d7e8); + border-radius: 0.5rem; + overflow: hidden; + align-self: flex-start; +} + +.mode-btn { + padding: 0.4rem 1.1rem; + font-size: 0.85rem; + font-family: var(--font-body, sans-serif); + font-weight: 500; + border: none; + background: var(--color-surface, #fff); + color: var(--color-text-secondary, #6b7a99); + cursor: pointer; + transition: background 0.15s, color 0.15s; +} + +.mode-btn:not(:last-child) { + border-right: 1px solid var(--color-border, #d0d7e8); +} + +.mode-btn.active { + background: var(--app-primary, #2A6080); + color: #fff; +} + +.mode-btn:not(.active):hover { + background: var(--color-surface-raised, #e4ebf5); +} + +/* ── LLM run controls ───────────────────────────── */ +.llm-run-controls { + display: flex; + align-items: center; + gap: 0.75rem; + flex-wrap: wrap; +} + +.llm-run-hint { + font-size: 0.8rem; + color: var(--color-text-secondary, #6b7a99); +} + +/* ── LLM results table tweaks ───────────────────── */ +.llm-results-table .bt-best { + color: var(--color-success, #3a7a32); + font-weight: 700; + background: color-mix(in srgb, var(--color-success, #3a7a32) 8%, transparent); +} + +.llm-model-name-cell { + font-family: var(--font-mono, monospace); + font-size: 0.75rem; + white-space: nowrap; + max-width: 16rem; + overflow: hidden; + text-overflow: ellipsis; + background: var(--color-surface, #fff); + border-top: 1px solid var(--color-border, #d0d7e8); + padding: 0.35rem 0.6rem; + position: sticky; + left: 0; +} + +.llm-tps-cell { + font-family: var(--font-mono, monospace); + font-variant-numeric: tabular-nums; + white-space: nowrap; +} diff --git a/web/src/views/SettingsView.vue b/web/src/views/SettingsView.vue index 032ddc1..e93240e 100644 --- a/web/src/views/SettingsView.vue +++ b/web/src/views/SettingsView.vue @@ -115,8 +115,18 @@

cf-orch Integration

Import SFT (supervised fine-tuning) candidates from cf-orch benchmark runs. + Connection settings fall back to environment variables + (CF_ORCH_URL, CF_LICENSE_KEY, OLLAMA_HOST) + when not set here.

+ +
+ {{ orchStatusLabel }} + via env vars + via label_tool.yaml +
+