From 9fdaeeb3d63130559ca9d6b6d43c2290d850d062 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 11 May 2026 09:05:12 -0700 Subject: [PATCH] feat: multi-bench dashboard, API path migration, benchmark reliability fixes - dashboard: eval card now shows last run + score for all bench types (classifier, LLM, style, plans) via new _get_recent_bench_runs() - dashboard: skip cforch LLM-bench list summaries when scanning for classifier best_macro_f1 (fixes _find_latest_classifier_bench) - cforch: stale _BENCH_RUNNING flag now auto-resets if process exited; idle timeout (120s via select) kills hung benchmark if node crashes - api: add /api/finetune/{run,cancel} backward-compat shims while ClassifierTab fine-tune section is migrated to TrainJobsView - ClassifierTab: migrate all /api/benchmark/* paths to /api/cforch/*; fix null-safety on results.models access; load fine-tuned models from /api/train/results instead of /api/finetune/status - CompareTab: extend model picker to include vllm + cf-text alongside ollama, grouped by service; pre-select all LLM_SERVICES on load - LlmEvalTab: null-safety on quality_by_task_type lookups - models: AVOCET_MODELS_DIR env var overrides default models/ path --- app/api.py | 27 ++++++++ app/cforch.py | 24 ++++++- app/dashboard.py | 109 +++++++++++++++++++++++++++++--- app/models.py | 10 ++- web/src/views/ClassifierTab.vue | 16 ++--- web/src/views/CompareTab.vue | 56 ++++++++++------ web/src/views/DashboardView.vue | 109 ++++++++++++++++++++++++-------- web/src/views/LlmEvalTab.vue | 4 +- 8 files changed, 285 insertions(+), 70 deletions(-) diff --git a/app/api.py b/app/api.py index a1334fb..b485f09 100644 --- a/app/api.py +++ b/app/api.py @@ -40,6 +40,33 @@ app.include_router(plans_bench_router, prefix="/api/plans-bench") # In-memory last-action store (single user, local tool — in-memory is fine) _last_action: dict | None = None +# -- Backward-compat shims (ClassifierTab still uses old /api/finetune/* paths) +# Remove once ClassifierTab fine-tune section is migrated to TrainJobsView. + +from fastapi import Query +from fastapi.responses import StreamingResponse as _StreamingResponse + +@app.get("/api/finetune/run") +def finetune_run_compat(model: str = Query(...), epochs: int = Query(5)) -> _StreamingResponse: + """Shim: create a classifier train job and immediately stream it.""" + from app.train.train import create_job, run_job, CreateJobRequest + job = create_job(CreateJobRequest(type="classifier", model_key=model, config_json={"epochs": epochs})) + return run_job(job["id"]) + +@app.post("/api/finetune/cancel") +def finetune_cancel_compat() -> dict: + """Shim: cancel the most recent running classifier job.""" + from app.train.train import _db, _init_db, cancel_job + from fastapi import HTTPException + _init_db() + with _db() as conn: + row = conn.execute( + "SELECT id FROM jobs WHERE type='classifier' AND status='running' ORDER BY started_at DESC LIMIT 1" + ).fetchone() + if row is None: + return {"status": "nothing_running"} + return cancel_job(row["id"]) + from app.dashboard import router as dashboard_router app.include_router(dashboard_router, prefix="/api") diff --git a/app/cforch.py b/app/cforch.py index 60c8221..136205b 100644 --- a/app/cforch.py +++ b/app/cforch.py @@ -16,6 +16,7 @@ import json import logging import os import re +import select as _select import subprocess as _subprocess import tempfile from pathlib import Path @@ -311,8 +312,12 @@ def run_benchmark( """Spawn cf-orch benchmark.py and stream stdout as SSE progress events.""" global _BENCH_RUNNING, _bench_proc + # Check if the process is actually still alive; reset stale flag if not. if _BENCH_RUNNING: - raise HTTPException(409, "A benchmark is already running") + if _bench_proc is not None and _bench_proc.poll() is None: + raise HTTPException(409, "A benchmark is already running") + _BENCH_RUNNING = False + _bench_proc = None cfg = _load_cforch_config() bench_script = cfg.get("bench_script", "") @@ -436,8 +441,23 @@ def run_benchmark( env=proc_env, ) _bench_proc = proc + _IDLE_TIMEOUT_S = 120 # kill if no output for 2 minutes (node crash) try: - for line in proc.stdout: + while True: + ready = _select.select([proc.stdout], [], [], _IDLE_TIMEOUT_S) + if not ready[0]: + # No output for IDLE_TIMEOUT_S — node likely crashed + proc.terminate() + try: + proc.wait(timeout=5) + except _subprocess.TimeoutExpired: + proc.kill() + msg = f"Benchmark timed out — no output for {_IDLE_TIMEOUT_S}s (cluster node may have crashed)" + yield f"data: {json.dumps({'type': 'error', 'message': msg})}\n\n" + break + line = proc.stdout.readline() + if not line: + break line = _strip_ansi(line.rstrip()) if line: yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n" diff --git a/app/dashboard.py b/app/dashboard.py index ef30a36..2e47e7b 100644 --- a/app/dashboard.py +++ b/app/dashboard.py @@ -1,17 +1,18 @@ """Avocet -- dashboard aggregate API. GET /api/dashboard returns the current flywheel state: - labeled_since_last_eval -- items labeled after the most recent eval run + labeled_since_last_eval -- items labeled after the most recent bench run last_eval_timestamp -- ISO timestamp of newest bench_results summary last_eval_best_score -- best macro_f1 from that summary active_jobs -- jobs with status queued or running corrections_pending -- sft_candidates with status=needs_review corrections_export_ready -- approved sft candidates with non-blank correction + recent_bench_runs -- most-recent timestamp + score per bench type signals -- computed booleans for UI nudge indicators Thresholds in label_tool.yaml pipeline: section: pipeline: - data_eval_threshold: 50 # labeled items since last eval to trigger nudge + data_eval_threshold: 50 # labeled items since last bench to trigger nudge eval_train_threshold: 0.05 # improvement delta needed before retraining (future) """ from __future__ import annotations @@ -77,7 +78,7 @@ def _load_score_records() -> list[dict]: pass return records -def _find_latest_eval(results_dir_override: str = "") -> tuple[str | None, float | None]: +def _find_latest_classifier_bench(results_dir_override: str = "") -> tuple[str | None, float | None]: """Return (iso_timestamp, best_macro_f1) from the newest bench_results summary. Checks results_dir from cforch config if set, then falls back to @@ -107,6 +108,8 @@ def _find_latest_eval(results_dir_override: str = "") -> tuple[str | None, float if summary.exists(): try: data = json.loads(summary.read_text(encoding="utf-8")) + if not isinstance(data, dict): + continue # cforch LLM-bench summaries are lists; skip ts = data.get("timestamp") or subdir.name score = data.get("best_macro_f1") or data.get("macro_f1") return ts, (float(score) if isinstance(score, (int, float)) else None) @@ -114,6 +117,10 @@ def _find_latest_eval(results_dir_override: str = "") -> tuple[str | None, float logger.warning("Failed to parse summary.json at %s: %s", summary, exc) return None, None +# Keep old name as alias so existing callers in tests still work. +_find_latest_eval = _find_latest_classifier_bench + + def _count_corrections() -> tuple[int, int]: """Return (pending_count, export_ready_count).""" pending = 0 @@ -169,22 +176,106 @@ def _count_labeled_since(since_ts: str | None) -> int: return sum(1 for r in records if r.get("labeled_at", "") > since_ts) +def _get_recent_bench_runs() -> dict: + """Return most-recent run summary for each bench type. + + Each entry: {"timestamp": str|None, "metric": str|None, "score": float|None} + """ + runs: dict[str, dict] = { + "classifier": {"timestamp": None, "metric": "macro_f1", "score": None}, + "llm": {"timestamp": None, "metric": None, "score": None}, + "style": {"timestamp": None, "metric": None, "score": None}, + "plans": {"timestamp": None, "metric": "avg_score", "score": None}, + } + + # ── Classifier: bench_results//summary.json ────────────────────── + clf_ts, clf_score = _find_latest_classifier_bench() + if clf_ts: + runs["classifier"]["timestamp"] = clf_ts + runs["classifier"]["score"] = clf_score + + # ── LLM bench + Style: benchmark_results/ ───────────────────────────── + f = _config_file() + bench_dir: Path | None = None + if f.exists(): + try: + raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} + rd = (raw.get("cforch", {}) or {}).get("results_dir", "") + if rd: + bench_dir = Path(rd) + except Exception: + pass + if bench_dir is None: + bench_dir = _ROOT / "benchmark_results" + + if bench_dir.exists(): + llm_files = sorted( + [p for p in bench_dir.glob("*.json") if not p.name.startswith("style_")], + key=lambda p: p.stat().st_mtime, reverse=True, + ) + if llm_files: + try: + data = json.loads(llm_files[0].read_text(encoding="utf-8")) + runs["llm"]["timestamp"] = data.get("timestamp") or llm_files[0].stem + except Exception: + pass + + style_files = sorted(bench_dir.glob("style_*.json"), reverse=True) + if style_files: + try: + data = json.loads(style_files[0].read_text(encoding="utf-8")) + if isinstance(data, list) and data: + runs["style"]["timestamp"] = data[0].get("timestamp") or style_files[0].stem + except Exception: + pass + + # ── Plans bench: data/plans_bench_results/plans_*.json ──────────────── + plans_dir = _DATA_DIR / "plans_bench_results" + if plans_dir.exists(): + plans_files = sorted(plans_dir.glob("plans_*.json"), reverse=True) + if plans_files: + run_id = plans_files[0].stem + try: + d: dict = json.loads(plans_files[0].read_text(encoding="utf-8")) + all_scores = [ + r["total_score"] + for results in d.values() + for r in results + if isinstance(r, dict) and not r.get("error") + ] + avg = round(sum(all_scores) / len(all_scores), 3) if all_scores else None + try: + date_part = run_id.removeprefix("plans_") + date, time_part = date_part.split("_") + ts_display = f"{date} {time_part[:2]}:{time_part[2:4]}" + except Exception: + ts_display = run_id + runs["plans"]["timestamp"] = ts_display + runs["plans"]["score"] = avg + except Exception: + pass + + return runs + + @router.get("/dashboard") def get_dashboard() -> dict: - data_eval_threshold, eval_train_threshold = _load_thresholds() - last_eval_ts, last_eval_score = _find_latest_eval() - labeled_since = _count_labeled_since(last_eval_ts) + data_threshold, _train_threshold = _load_thresholds() + last_ts, last_score = _find_latest_classifier_bench() + labeled_since = _count_labeled_since(last_ts) corrections_pending, corrections_export_ready = _count_corrections() active_jobs = _get_active_jobs() + recent_bench = _get_recent_bench_runs() return { "labeled_since_last_eval": labeled_since, - "last_eval_timestamp": last_eval_ts, - "last_eval_best_score": last_eval_score, + "last_eval_timestamp": last_ts, + "last_eval_best_score": last_score, "active_jobs": active_jobs, "corrections_pending": corrections_pending, "corrections_export_ready": corrections_export_ready, + "recent_bench_runs": recent_bench, "signals": { - "data_to_eval": labeled_since >= data_eval_threshold, + "data_to_eval": labeled_since >= data_threshold, "eval_to_train": False, # future: implement delta-F1 comparison "train_to_fleet": False, # future: implement fleet sync signal }, diff --git a/app/models.py b/app/models.py index b4c05ae..ad08ecb 100644 --- a/app/models.py +++ b/app/models.py @@ -38,13 +38,17 @@ except ImportError: # pragma: no cover logger = logging.getLogger(__name__) _ROOT = Path(__file__).parent.parent -_MODELS_DIR: Path = _ROOT / "models" +_MODELS_DIR: Path = Path( + os.environ.get("AVOCET_MODELS_DIR", str(_ROOT / "models")) +) _QUEUE_DIR: Path = _ROOT / "data" # Service-specific model destinations. # cf-text models land on the NFS-mounted shared asset store so every cluster -# node can reach them without a separate download. Avocet classifiers stay local -# because they are fine-tuned in-place and are only consumed by avocet itself. +# node can reach them without a separate download. Avocet classifiers default +# to a local path but can be redirected via AVOCET_MODELS_DIR — set this to +# /Library/Assets/LLM/avocet/models on NFS-connected nodes to keep all model +# weights out of the repo directory. # Override via CF_TEXT_MODELS_DIR env var (useful for dev / non-NFS setups). _CF_TEXT_MODELS_DIR: Path = Path( os.environ.get("CF_TEXT_MODELS_DIR", "/Library/Assets/LLM/cf-text/models") diff --git a/web/src/views/ClassifierTab.vue b/web/src/views/ClassifierTab.vue index a3feef8..93a0474 100644 --- a/web/src/views/ClassifierTab.vue +++ b/web/src/views/ClassifierTab.vue @@ -325,7 +325,7 @@ function toggleCategory(models: AvailableModel[], checked: boolean) { async function loadModelCategories() { modelsLoading.value = true - const { data } = await useApiFetch('/api/benchmark/models') + const { data } = await useApiFetch('/api/cforch/models') modelsLoading.value = false if (data?.categories) { modelCategories.value = data.categories @@ -342,7 +342,7 @@ const modelCount = computed(() => modelNames.value.length) const labelNames = computed(() => { const canonical = Object.keys(LABEL_META) const inResults = new Set( - modelNames.value.flatMap(n => Object.keys(results.value!.models[n].per_label)) + modelNames.value.flatMap(n => Object.keys(results.value?.models[n]?.per_label ?? {})) ) return [...canonical.filter(l => inResults.has(l)), ...[...inResults].filter(l => !canonical.includes(l))] }) @@ -401,16 +401,16 @@ function formatDate(iso: string | null): string { // ── Data loading ───────────────────────────────────────────────────────────── async function loadResults() { loading.value = true - const { data } = await useApiFetch('/api/benchmark/results') + const { data } = await useApiFetch('/api/cforch/results') loading.value = false - if (data && Object.keys(data.models).length > 0) { + if (data?.models && Object.keys(data.models).length > 0) { results.value = data } } async function loadFineTunedModels() { - const { data } = await useApiFetch('/api/finetune/status') - if (Array.isArray(data)) fineTunedModels.value = data + const { data } = await useApiFetch<{ results: FineTunedModel[] }>('/api/train/results') + if (Array.isArray(data?.results)) fineTunedModels.value = data.results } // ── Benchmark run ──────────────────────────────────────────────────────────── @@ -428,7 +428,7 @@ function startBenchmark() { params.set('model_names', [...selectedModels.value].join(',')) } const qs = params.toString() - const url = `/api/benchmark/run${qs ? `?${qs}` : ''}` + const url = `/api/cforch/run${qs ? `?${qs}` : ''}` useApiSSE( url, async (event) => { @@ -457,7 +457,7 @@ function startBenchmark() { } async function cancelBenchmark() { - await fetch('/api/benchmark/cancel', { method: 'POST' }).catch(() => {}) + await fetch('/api/cforch/cancel', { method: 'POST' }).catch(() => {}) } // ── Fine-tune ───────────────────────────────────────────────────────────────── diff --git a/web/src/views/CompareTab.vue b/web/src/views/CompareTab.vue index ceb4ca1..6481649 100644 --- a/web/src/views/CompareTab.vue +++ b/web/src/views/CompareTab.vue @@ -71,32 +71,35 @@ rows="6" /> - +
- 🤖 Ollama Models - {{ cmpSelectedModels.size }} / {{ ollamaLlmModels.length }} + 🤖 LLM Models + {{ cmpSelectedModels.size }} / {{ llmSelectableModels.length }}
-
- +
+ {{ service }} +
+ +
@@ -232,10 +235,22 @@ const cmpResults = ref([]) const cmpEventSource = ref(null) // ── Computed ──────────────────────────────────────────────────────────────── -const ollamaLlmModels = computed(() => - llmModels.value.filter(m => m.service === 'ollama') +const LLM_SERVICES = new Set(['ollama', 'vllm', 'cf-text']) + +const llmSelectableModels = computed(() => + llmModels.value.filter(m => LLM_SERVICES.has(m.service)) ) +/** Group selectable models by service for the picker UI */ +const llmModelsByService = computed((): Record => { + const groups: Record = {} + for (const m of llmSelectableModels.value) { + if (!groups[m.service]) groups[m.service] = [] + groups[m.service].push(m) + } + return groups +}) + const llmTasksByType = computed((): Record => { const groups: Record = {} for (const t of llmTasks.value) { @@ -270,7 +285,7 @@ function toggleCmpModel(id: string, checked: boolean) { function toggleAllCmpModels(checked: boolean) { cmpSelectedModels.value = checked - ? new Set(ollamaLlmModels.value.map(m => m.id)) + ? new Set(llmSelectableModels.value.map(m => m.id)) : new Set() } @@ -288,9 +303,8 @@ async function loadLlmModels() { const { data } = await useApiFetch<{ models: CfOrchModel[] }>('/api/cforch/models') if (data?.models) { llmModels.value = data.models - // Pre-select all ollama models cmpSelectedModels.value = new Set( - data.models.filter(m => m.service === 'ollama').map(m => m.id) + data.models.filter(m => LLM_SERVICES.has(m.service)).map(m => m.id) ) } } diff --git a/web/src/views/DashboardView.vue b/web/src/views/DashboardView.vue index cfab7ab..f2c56db 100644 --- a/web/src/views/DashboardView.vue +++ b/web/src/views/DashboardView.vue @@ -28,9 +28,6 @@ labeled since last eval

-
- Run Eval -
@@ -40,18 +37,28 @@

Eval

-

- Last run: - {{ formattedEvalTime }} -

-

- Best score: - {{ formatScore(data.last_eval_best_score) }} -

+
+
+ {{ BENCH_LABELS[type as BenchType] ?? type }} + + {{ run.timestamp ? formatBenchTs(run.timestamp) : '—' }} + + + {{ formatScore(run.score) }} + +
+
Queue Finetune
+
+ Run Eval +
@@ -104,33 +111,49 @@ interface DashboardSignals { train_to_fleet: boolean } +interface BenchRun { + timestamp: string | null + metric: string | null + score: number | null +} + +type BenchType = 'classifier' | 'llm' | 'style' | 'plans' + interface DashboardData { labeled_since_last_eval: number last_eval_timestamp: string | null last_eval_best_score: number | null active_jobs: ActiveJob[] corrections_export_ready: number + recent_bench_runs: Record signals: DashboardSignals } +const BENCH_LABELS: Record = { + classifier: 'Classifier', + llm: 'LLM Eval', + style: 'Style', + plans: 'Planning', +} + const data = ref(null) const loading = ref(false) const error = ref(null) -const formattedEvalTime = computed(() => { - if (!data.value?.last_eval_timestamp) return 'Never' - const date = new Date(data.value.last_eval_timestamp) - if (isNaN(date.getTime())) return 'Unknown' - const now = Date.now() - const diff = now - date.getTime() - const mins = Math.floor(diff / 60000) - if (mins < 1) return 'just now' - if (mins < 60) return `${mins}m ago` - const hrs = Math.floor(mins / 60) - if (hrs < 24) return `${hrs}h ago` - const days = Math.floor(hrs / 24) - return `${days}d ago` -}) +function formatBenchTs(ts: string): string { + const date = new Date(ts) + if (!isNaN(date.getTime())) { + const diff = Date.now() - date.getTime() + const mins = Math.floor(diff / 60000) + if (mins < 1) return 'just now' + if (mins < 60) return `${mins}m ago` + const hrs = Math.floor(mins / 60) + if (hrs < 24) return `${hrs}h ago` + return `${Math.floor(hrs / 24)}d ago` + } + // Non-ISO: show as-is (plans bench uses "YYYY-MM-DD HH:MM") + return ts.length > 16 ? ts.slice(0, 16) : ts +} function formatScore(score: number): string { return `${(score * 100).toFixed(1)}%` @@ -285,6 +308,42 @@ onMounted(() => load()) .cta-btn:hover { background: color-mix(in srgb, var(--app-primary, #2A6080) 85%, black); } +/* ── Bench run table ── */ +.bench-run-table { + display: flex; + flex-direction: column; + gap: 0.3rem; +} + +.bench-run-row { + display: grid; + grid-template-columns: 6rem 1fr auto; + align-items: center; + gap: 0.5rem; + font-size: 0.82rem; +} + +.bench-type-label { + font-weight: 600; + color: var(--color-text, #1a2338); + font-size: 0.78rem; +} + +.bench-run-time { + color: var(--color-text-secondary, #6b7a99); + font-size: 0.78rem; +} + +.bench-run-score { + font-family: var(--font-mono, monospace); + font-size: 0.75rem; + font-weight: 600; + color: var(--app-primary, #2A6080); + background: color-mix(in srgb, var(--app-primary, #2A6080) 10%, transparent); + padding: 0.1rem 0.35rem; + border-radius: 0.25rem; +} + /* ── Job pills ── */ .job-row { display: flex; diff --git a/web/src/views/LlmEvalTab.vue b/web/src/views/LlmEvalTab.vue index 4475701..b307a25 100644 --- a/web/src/views/LlmEvalTab.vue +++ b/web/src/views/LlmEvalTab.vue @@ -302,7 +302,7 @@ const llmModelBadge = computed(() => { const llmTaskTypeCols = computed(() => { const types = new Set() for (const r of llmResults.value) { - for (const k of Object.keys(r.quality_by_task_type)) types.add(k) + for (const k of Object.keys(r.quality_by_task_type ?? {})) types.add(k) } return [...types].sort() }) @@ -338,7 +338,7 @@ const llmBestByCol = computed((): Record => { for (const col of llmTaskTypeCols.value) { bestId = ''; bestVal = -Infinity for (const r of llmResults.value) { - const v = r.quality_by_task_type[col] + const v = r.quality_by_task_type?.[col] if (v != null && v > bestVal) { bestVal = v; bestId = r.model_id } } best[col] = bestId