avocet/scripts/benchmark_plans.py
pyr0ball bce932461a feat: plans benchmark harness — model scoring for CF planning prompts
Adds benchmark_plans.py script, plans_bench API router, PlansBenchTab Vue
component, and registers /api/plans-bench in api.py. Also extends models
registry (cf-text catalog integration), cforch client, LlmEvalTab, and
ModelsView with cf-orch fleet support. Wires Planning mode into BenchmarkView.
2026-05-02 23:36:04 -07:00

719 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""CF-specific planning benchmark — compare base models before fine-tuning.
Sends held-out CircuitForge planning prompts to one or more models via the
cf-text (local) or cf-orch API, then scores responses against CF-specific
rubrics. Use this to select the best base model for SFT.
Scoring rubrics (each 0-1, summed to total/N):
- task_structure : uses checkbox syntax (- [ ]), git commit steps
- tier_awareness : mentions Free/Paid/Premium/Ultra tiers
- privacy_pillar : mentions privacy/local-inference/no-logging
- safety_pillar : mentions safety, human approval, or reversibility
- accessibility : mentions ND/accessibility/adaptive needs
- license_split : mentions MIT vs BSL or open-core model
- file_paths : uses plausible file path references
- cf_conventions : uses conda run -n cf, /Library/Development/, or known CF dirs
- paired_coherence : (paired only) plan references the design doc's feature name
- length_ok : 3002500 words (under-short = hallucination risk; over-long = padding)
Usage
-----
# List available model targets
python scripts/benchmark_plans.py --list-models
# Run all held-out prompts against a single model, print report
python scripts/benchmark_plans.py --model llama3.2-3b
# Compare two models side-by-side
python scripts/benchmark_plans.py --compare llama3.2-3b mistral-7b
# Run with a custom API base (cf-text default: http://localhost:8080/v1)
python scripts/benchmark_plans.py --model llama3.2-3b --api-base http://localhost:8080/v1
# Export detailed results JSON
python scripts/benchmark_plans.py --model llama3.2-3b --output data/bench_results.json
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any
import httpx
# ── Paths ──────────────────────────────────────────────────────────────────────
_ROOT = Path(__file__).parent.parent
_DATA_DIR = _ROOT / "data"
CF_TEXT_BASE = "http://localhost:8080/v1"
CF_ORCH_BASE = "http://localhost:8090/v1"
CF_COORD_URL = "http://10.1.10.71:7700" # cf-orch coordinator (LAN)
# ── Held-out prompts ───────────────────────────────────────────────────────────
# These are NOT in the training export (no matching docs in circuitforge-plans/).
# Each prompt exercises a different CF planning domain.
HELD_OUT_PROMPTS: list[dict[str, Any]] = [
{
"id": "ho_001",
"name": "kiwi_barcode_ocr",
"domain": "feature_plan",
"prompt": (
"You are a senior engineer on Kiwi, a CircuitForge pantry-tracking product. "
"Write a detailed implementation plan for adding barcode scanning via device camera "
"and receipt OCR to the item-add flow.\n\n"
"The plan should include: file structure (create/modify), step-by-step task checklist "
"with checkboxes, any DB migrations, and git commit steps."
),
"expected_signals": ["task_structure", "file_paths", "cf_conventions"],
},
{
"id": "ho_002",
"name": "peregrine_ats_scoring",
"domain": "feature_design",
"prompt": (
"Write a design document for Peregrine: ATS keyword scoring for job applications.\n\n"
"Context: Peregrine users paste job descriptions and their resume. "
"We want to score how well the resume keywords match the JD and suggest rewrites. "
"Describe the architecture, data flow, and key design decisions."
),
"expected_signals": ["privacy_pillar", "tier_awareness", "license_split"],
},
{
"id": "ho_003",
"name": "tier_gate_local_llm",
"domain": "architecture",
"prompt": (
"Design the tier-gating architecture for a new CircuitForge product. "
"The product should:\n"
"- Default to local LLM inference for all tiers\n"
"- Unlock cloud LLM for Paid tier and above\n"
"- Keep fine-tuned model weights for Premium/Ultra only\n\n"
"Describe how the tier check integrates with the LLM router, "
"what happens when a Free user tries a Paid-tier feature, "
"and how BYOK (bring-your-own-key) fits in."
),
"expected_signals": ["tier_awareness", "privacy_pillar", "license_split"],
},
{
"id": "ho_004",
"name": "heimdall_webhook_plan",
"domain": "feature_plan",
"prompt": (
"Break the following Heimdall feature into a detailed implementation plan with "
"file structure and task checkboxes — Stripe webhook handler for subscription lifecycle.\n\n"
"Heimdall is the CircuitForge license server (FastAPI + SQLite). "
"The webhook needs to handle checkout.session.completed, "
"customer.subscription.updated, and customer.subscription.deleted events."
),
"expected_signals": ["task_structure", "file_paths", "safety_pillar"],
},
{
"id": "ho_005",
"name": "nd_accessible_onboarding",
"domain": "ux_design",
"prompt": (
"You are a product designer working on Harrier, a CircuitForge tool for "
"helping people navigate government benefits applications.\n\n"
"Design the onboarding flow for neurodivergent (ND) users. "
"Consider: ADHD time-blindness, executive function challenges, demand avoidance, "
"and rejection sensitivity. The flow should reduce cognitive load and "
"never use urgency or panic patterns."
),
"expected_signals": ["accessibility", "safety_pillar", "privacy_pillar"],
},
{
"id": "ho_006",
"name": "circuitforge_core_extraction",
"domain": "architecture",
"prompt": (
"Produce a CircuitForge-style design document for the following circuitforge-core "
"feature — shared ActivityPub federation module.\n\n"
"Background: Multiple CF products (Kiwi, Rook, Snipe) want to publish updates "
"to ActivityPub. Build it once in cf-core (MIT licensed) so all products can use it. "
"Design the module API, describe what belongs in MIT vs BSL, and note federation "
"privacy constraints."
),
"expected_signals": ["license_split", "privacy_pillar", "cf_conventions"],
},
{
"id": "ho_007",
"name": "snipe_trust_score_plan",
"domain": "feature_plan",
"prompt": (
"You are a senior engineer on Snipe, a CircuitForge eBay trust-scoring tool. "
"Write a step-by-step engineering plan for: seller trust score calculation.\n\n"
"The score should combine: feedback ratio, account age, item-specifics completeness, "
"listing photo quality, and shipping time accuracy. "
"Include file structure, test plan, and migration steps."
),
"expected_signals": ["task_structure", "file_paths", "safety_pillar"],
},
{
"id": "ho_008",
"name": "avocet_training_pipeline",
"domain": "feature_plan",
"prompt": (
"Break the following Avocet feature into a detailed implementation plan — "
"end-to-end fine-tuning pipeline from labeled JSONL to deployed GGUF model.\n\n"
"Avocet is the CircuitForge email classifier training tool. "
"The pipeline should: validate the dataset, run LoRA SFT via unsloth, "
"quantize to Q5_K_M GGUF, run the benchmark harness, and register the model "
"in the Avocet model queue if it beats the baseline."
),
"expected_signals": ["task_structure", "file_paths", "cf_conventions"],
},
{
"id": "ho_009",
"name": "privacy_data_flow",
"domain": "architecture",
"prompt": (
"Design the data privacy architecture for a CircuitForge cloud product. "
"Describe: what PII is collected, how it's stored, retention policy, "
"obfuscation strategy for cloud-side logs, and how consent is obtained "
"in plain language. The product handles job applications (resumes, cover letters)."
),
"expected_signals": ["privacy_pillar", "safety_pillar", "accessibility"],
},
{
"id": "ho_010",
"name": "git_workflow_doc",
"domain": "process_doc",
"prompt": (
"Write a developer process document for CircuitForge: conventional commit and "
"branch workflow for a BSL 1.1 open-core product.\n\n"
"Cover: commit message format (type: description), branch naming, "
"when to use feature branches vs direct main commits, "
"how the MIT/BSL split affects which commits go in which branch, "
"and how CI gates on gitleaks for secret scanning."
),
"expected_signals": ["license_split", "cf_conventions", "task_structure"],
},
]
# ── Rubric scoring ─────────────────────────────────────────────────────────────
_TASK_STRUCTURE_RE = re.compile(r"- \[ \]", re.MULTILINE)
_COMMIT_RE = re.compile(r"git commit|git add", re.IGNORECASE)
_TIER_RE = re.compile(r"\b(Free|Paid|Premium|Ultra)\s+tier|\btier\s+(Free|Paid|Premium|Ultra)", re.IGNORECASE)
_PRIVACY_RE = re.compile(r"\b(privacy|local.?inference|no.?logging|no.?pii|user.?data|data.?reten|obfuscat)", re.IGNORECASE)
_SAFETY_RE = re.compile(r"\b(human.?approv|reversib|safety|safe.?default|fail.?safe|harm)", re.IGNORECASE)
_A11Y_RE = re.compile(r"\b(neurodiverg|ND\b|accessib|adaptive|ADHD|autism|executive.?function|demand.?avoid)", re.IGNORECASE)
_LICENSE_RE = re.compile(r"\b(MIT|BSL|open.?core|proprietary|commercial.?licens)", re.IGNORECASE)
_FILE_PATH_RE = re.compile(r"(app/|tests?/|src/|scripts?/)\w[\w/.-]{3,}", re.IGNORECASE)
_CF_CONV_RE = re.compile(r"(conda run -n cf|/Library/Development/CircuitForge|circuitforge-core|manage\.sh)", re.IGNORECASE)
@dataclass
class RubricScore:
task_structure: float = 0.0
tier_awareness: float = 0.0
privacy_pillar: float = 0.0
safety_pillar: float = 0.0
accessibility: float = 0.0
license_split: float = 0.0
file_paths: float = 0.0
cf_conventions: float = 0.0
length_ok: float = 0.0
def total(self) -> float:
vals = [self.task_structure, self.tier_awareness, self.privacy_pillar,
self.safety_pillar, self.accessibility, self.license_split,
self.file_paths, self.cf_conventions, self.length_ok]
return sum(vals) / len(vals)
def as_dict(self) -> dict[str, float]:
return asdict(self)
def score_response(response: str, prompt_meta: dict[str, Any]) -> RubricScore:
words = len(response.split())
s = RubricScore()
# Task structure: needs checkboxes AND at least one commit step
checkbox_hits = len(_TASK_STRUCTURE_RE.findall(response))
has_commit = bool(_COMMIT_RE.search(response))
s.task_structure = min(1.0, checkbox_hits / 5) * 0.7 + (0.3 if has_commit else 0.0)
# Tier awareness
s.tier_awareness = min(1.0, len(_TIER_RE.findall(response)) / 2)
# Privacy pillar
s.privacy_pillar = min(1.0, len(_PRIVACY_RE.findall(response)) / 3)
# Safety pillar
s.safety_pillar = min(1.0, len(_SAFETY_RE.findall(response)) / 2)
# Accessibility
s.accessibility = min(1.0, len(_A11Y_RE.findall(response)) / 2)
# License split awareness
s.license_split = min(1.0, len(_LICENSE_RE.findall(response)) / 2)
# File paths: at least 3 plausible path references
s.file_paths = min(1.0, len(_FILE_PATH_RE.findall(response)) / 3)
# CF conventions
s.cf_conventions = min(1.0, len(_CF_CONV_RE.findall(response)) / 2)
# Length: 2002500 words is healthy; outside = partial credit
if 200 <= words <= 2500:
s.length_ok = 1.0
elif words < 200:
s.length_ok = words / 200
else:
s.length_ok = max(0.0, 1.0 - (words - 2500) / 2500)
return s
# ── Model client ───────────────────────────────────────────────────────────────
# Registry of named model targets (shorthand → {api_base, model_name})
MODEL_REGISTRY: dict[str, dict[str, str]] = {
"deepseek-r1-1.5b": {
"api_base": CF_TEXT_BASE,
"model": "deepseek-r1-1.5b",
"description": "DeepSeek R1 1.5B distill (cf-orch catalog key)",
},
"deepseek-r1-7b-4bit": {
"api_base": CF_TEXT_BASE,
"model": "deepseek-r1-7b-4bit",
"description": "DeepSeek R1 7B distill, 4-bit (cf-orch catalog key)",
},
"deepseek-coder-6.7b-4bit": {
"api_base": CF_TEXT_BASE,
"model": "deepseek-coder-6.7b-4bit",
"description": "DeepSeek Coder 6.7B instruct, 4-bit (cf-orch catalog key)",
},
"granite-4.1-8b": {
"api_base": CF_TEXT_BASE,
"model": "granite-4.1-8b",
"description": "IBM Granite 4.1 8B, 4-bit (cf-orch catalog key)",
},
"qwen2.5-3b": {
"api_base": CF_TEXT_BASE,
"model": "qwen2.5-3b",
"description": "Qwen 2.5 3B Q4 GGUF (cf-orch catalog key, navi only)",
},
"qwen2.5-7b": {
"api_base": CF_TEXT_BASE,
"model": "qwen2.5-7b",
"description": "Qwen 2.5 7B Q4 GGUF (cf-orch catalog key, navi only)",
},
}
# ── cf-orch allocation ─────────────────────────────────────────────────────────
def _cforch_allocate(
model_id: str,
cforch_url: str,
startup_timeout_s: float = 300.0,
) -> tuple[str, str] | None:
"""Allocate a cf-text instance for model_id via the cf-orch coordinator.
Returns (service_url, allocation_id) on success, None on failure.
service_url is the direct node URL exposing /v1/chat/completions.
"""
try:
resp = httpx.post(
f"{cforch_url}/api/services/cf-text/allocate",
json={
"model_candidates": [model_id],
"caller": "avocet",
"pipeline": "plans_benchmark",
},
timeout=120.0,
)
resp.raise_for_status()
data = resp.json()
service_url: str = data["url"]
allocation_id: str = data.get("allocation_id", "")
node_id: str = data.get("node_id", "")
gpu_id: int | None = data.get("gpu_id")
if data.get("started", False) and not data.get("warm", True):
# Use \n so the SSE generator sees the line immediately
print(f" [cold start] loading {model_id!r} — polling every 3s…", flush=True)
t0 = time.monotonic()
deadline = t0 + startup_timeout_s
probe_misses = 0
while time.monotonic() < deadline:
elapsed = time.monotonic() - t0
try:
status = httpx.get(f"{cforch_url}/api/services/cf-text/status", timeout=5.0)
if status.is_success:
instances = status.json().get("instances", [])
match = next(
(i for i in instances
if i.get("node_id") == node_id and i.get("gpu_id") == gpu_id),
None,
)
if match:
probe_misses = 0
state = match.get("state", "")
if state == "running":
print(f" [cold start] ready in {elapsed:.0f}s", flush=True)
return service_url, allocation_id
elif state == "stopped":
print(f" [cold start] failed — service stopped after {elapsed:.0f}s", flush=True)
return None
else:
# still starting — emit keepalive so SSE stream stays alive
print(f" [cold start] state={state!r} elapsed={elapsed:.0f}s", flush=True)
else:
probe_misses += 1
print(f" [cold start] waiting… elapsed={elapsed:.0f}s", flush=True)
if probe_misses >= 6:
try:
h = httpx.get(f"{service_url}/health", timeout=3.0)
if h.is_success:
print(f" [cold start] ready via health check in {elapsed:.0f}s", flush=True)
return service_url, allocation_id
except Exception:
pass
else:
print(f" [cold start] status poll returned {status.status_code}, elapsed={elapsed:.0f}s", flush=True)
except Exception as poll_exc:
print(f" [cold start] poll error: {poll_exc} elapsed={elapsed:.0f}s", flush=True)
time.sleep(3.0)
print(f" [cold start] timed out after {time.monotonic()-t0:.0f}s", flush=True)
return None
return service_url, allocation_id
except Exception as exc:
print(f"[warn] cf-orch allocation failed for {model_id!r}: {exc}", file=sys.stderr)
return None
def _call_model_direct(service_url: str, model: str, prompt: str, timeout: int = 600) -> tuple[str, float]:
"""Call an OpenAI-compatible /v1/chat/completions on a direct service URL."""
t0 = time.monotonic()
resp = httpx.post(
f"{service_url.rstrip('/')}/v1/chat/completions",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 2048,
"temperature": 0.2,
},
timeout=timeout,
)
resp.raise_for_status()
latency = time.monotonic() - t0
text = resp.json()["choices"][0]["message"]["content"]
return text, latency
def _call_model(api_base: str, model: str, prompt: str, timeout: int = 180) -> tuple[str, float]:
"""Call an OpenAI-compatible /chat/completions endpoint. Returns (text, latency_s)."""
t0 = time.monotonic()
resp = httpx.post(
f"{api_base}/chat/completions",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 2048,
"temperature": 0.2,
},
timeout=timeout,
)
resp.raise_for_status()
latency = time.monotonic() - t0
text = resp.json()["choices"][0]["message"]["content"]
return text, latency
# ── Benchmark runner ───────────────────────────────────────────────────────────
@dataclass
class PromptResult:
prompt_id: str
prompt_name: str
model_key: str
response: str
latency_s: float
word_count: int
scores: dict[str, float]
total_score: float
error: str | None = None
def run_benchmark(
model_key: str,
model_name: str,
prompts: list[dict[str, Any]] | None = None,
verbose: bool = False,
# cf-orch path
use_cforch: bool = False,
cforch_url: str = CF_COORD_URL,
# direct path (used when not cf-orch)
api_base: str = CF_TEXT_BASE,
) -> list[PromptResult]:
"""Run all prompts through one model. Uses cf-orch allocation when use_cforch=True."""
if prompts is None:
prompts = HELD_OUT_PROMPTS
# Allocate once per model when using cf-orch
service_url: str | None = None
if use_cforch:
print(f" Allocating {model_name!r} via cf-orch…", flush=True)
alloc = _cforch_allocate(model_name, cforch_url)
if alloc is None:
# Return all prompts as errors
return [
PromptResult(
prompt_id=p["id"], prompt_name=p["name"], model_key=model_key,
response="", latency_s=0.0, word_count=0, scores={}, total_score=0.0,
error=f"cf-orch allocation failed for {model_name!r}",
)
for p in prompts
]
service_url, _alloc_id = alloc
results: list[PromptResult] = []
for p in prompts:
if verbose:
print(f" [{p['id']}] {p['name']}", end="", flush=True)
try:
if service_url:
response, latency = _call_model_direct(service_url, model_name, p["prompt"])
else:
response, latency = _call_model(api_base, model_name, p["prompt"])
rubric = score_response(response, p)
result = PromptResult(
prompt_id=p["id"],
prompt_name=p["name"],
model_key=model_key,
response=response,
latency_s=round(latency, 2),
word_count=len(response.split()),
scores=rubric.as_dict(),
total_score=round(rubric.total(), 3),
)
if verbose:
print(f"score={result.total_score:.3f} ({result.word_count}w, {latency:.1f}s)")
except Exception as exc:
result = PromptResult(
prompt_id=p["id"],
prompt_name=p["name"],
model_key=model_key,
response="",
latency_s=0.0,
word_count=0,
scores={},
total_score=0.0,
error=str(exc),
)
if verbose:
print(f"ERROR: {exc}")
results.append(result)
return results
# ── Reporting ──────────────────────────────────────────────────────────────────
def _print_single_report(results: list[PromptResult], model_key: str) -> None:
ok = [r for r in results if not r.error]
err = [r for r in results if r.error]
if not ok:
print(f"\n[{model_key}] All {len(err)} prompts failed.\n")
return
avg_total = sum(r.total_score for r in ok) / len(ok)
avg_latency = sum(r.latency_s for r in ok) / len(ok)
# Aggregate per-rubric averages
rubric_keys = list(ok[0].scores.keys())
rubric_avgs = {k: sum(r.scores.get(k, 0) for r in ok) / len(ok) for k in rubric_keys}
print(f"\n{'='*60}")
print(f" Model : {model_key}")
print(f" Prompts: {len(ok)}/{len(results)} passed ({len(err)} errors)")
print(f" Overall score : {avg_total:.3f} (avg latency {avg_latency:.1f}s)")
print(f"\n Rubric breakdown:")
for k, v in sorted(rubric_avgs.items(), key=lambda x: -x[1]):
bar = "" * int(v * 20)
print(f" {k:<22} {v:.3f} {bar}")
print(f"\n Per-prompt scores:")
for r in sorted(ok, key=lambda x: -x.total_score):
flag = "" if r.total_score < 0.3 else " "
print(f" {flag} {r.prompt_id} {r.prompt_name:<35} {r.total_score:.3f} ({r.word_count}w)")
if err:
print(f"\n Errors:")
for r in err:
print(f" {r.prompt_id} {r.prompt_name}: {r.error}")
print(f"{'='*60}\n")
def _print_comparison_table(all_results: dict[str, list[PromptResult]]) -> None:
model_keys = list(all_results.keys())
prompt_ids = [p["id"] for p in HELD_OUT_PROMPTS]
# Scores by (model, prompt_id)
score_map: dict[tuple[str, str], float] = {}
for mk, results in all_results.items():
for r in results:
score_map[(mk, r.prompt_id)] = r.total_score if not r.error else 0.0
col_w = 10
header = f"{'Prompt':<35}" + "".join(f"{mk[:col_w-1]:<{col_w}}" for mk in model_keys)
print(f"\n{'='*len(header)}")
print(" COMPARISON TABLE")
print(f"{'='*len(header)}")
print(f" {header}")
print(f" {'-'*len(header)}")
for pid in prompt_ids:
pname = next(p["name"] for p in HELD_OUT_PROMPTS if p["id"] == pid)
row = f" {pname:<35}"
best = max(score_map.get((mk, pid), 0.0) for mk in model_keys)
for mk in model_keys:
v = score_map.get((mk, pid), 0.0)
marker = "*" if v == best and len(model_keys) > 1 else " "
row += f"{v:.3f}{marker} "
print(row)
print(f" {'-'*len(header)}")
avgs_row = f" {'AVERAGE':<35}"
best_avg = -1.0
avgs: dict[str, float] = {}
for mk in model_keys:
vals = [score_map.get((mk, pid), 0.0) for pid in prompt_ids]
avgs[mk] = sum(vals) / len(vals)
best_avg = max(best_avg, avgs[mk])
for mk in model_keys:
marker = "*" if avgs[mk] == best_avg and len(model_keys) > 1 else " "
avgs_row += f"{avgs[mk]:.3f}{marker} "
print(avgs_row)
print(f"{'='*len(header)}\n")
if len(model_keys) > 1:
winner = max(avgs, key=lambda k: avgs[k])
print(f" Winner: {winner} (avg {avgs[winner]:.3f})\n")
# ── CLI ────────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--list-models", action="store_true",
help="Print registered model shortcuts and exit")
parser.add_argument("--model", metavar="KEY",
help="Benchmark a single model (registry key or raw model name)")
parser.add_argument("--compare", nargs="+", metavar="KEY",
help="Compare two or more models side-by-side")
parser.add_argument("--cforch", action="store_true",
help="Route inference through cf-orch coordinator (allocate per model)")
parser.add_argument("--cforch-url", default=CF_COORD_URL, metavar="URL",
help=f"cf-orch coordinator URL (default: {CF_COORD_URL})")
parser.add_argument("--api-base", default=None,
help="Direct API base URL when not using cf-orch")
parser.add_argument("--model-name", default=None,
help="Override model name sent to API (single-model runs only)")
parser.add_argument("--prompts", nargs="+", metavar="ID",
help="Run only specific prompt IDs (e.g. ho_001 ho_003)")
parser.add_argument("--output", type=Path, default=None,
help="Write detailed JSON results to this path")
parser.add_argument("--workers", type=int, default=1, metavar="N",
help="Run N models concurrently (default 1). Set to number of available nodes.")
parser.add_argument("--verbose", "-v", action="store_true",
help="Print per-prompt progress")
args = parser.parse_args()
if args.list_models:
print("\nRegistered model shortcuts:")
for key, info in MODEL_REGISTRY.items():
print(f" {key:<20} {info['description']}")
print(f"\nDefault endpoints:")
print(f" direct {CF_TEXT_BASE}")
print(f" cf-orch {CF_COORD_URL}")
return
prompts = HELD_OUT_PROMPTS
if args.prompts:
ids = set(args.prompts)
prompts = [p for p in HELD_OUT_PROMPTS if p["id"] in ids]
if not prompts:
print(f"No prompts matched IDs: {args.prompts}", file=sys.stderr)
sys.exit(1)
model_keys: list[str] = []
if args.compare:
model_keys = args.compare
elif args.model:
model_keys = [args.model]
else:
parser.print_help()
sys.exit(0)
all_results: dict[str, list[PromptResult]] = {}
print_lock = threading.Lock()
def _run_one(mk: str) -> tuple[str, list[PromptResult]]:
if mk in MODEL_REGISTRY:
reg = MODEL_REGISTRY[mk]
model_name = args.model_name or reg["model"]
direct_base = args.api_base or reg["api_base"]
else:
model_name = args.model_name or mk
direct_base = args.api_base or CF_TEXT_BASE
if args.cforch:
with print_lock:
print(f"\nRunning [{mk}] via cf-orch ({args.cforch_url}) model={model_name}")
results = run_benchmark(
mk, model_name, prompts=prompts, verbose=args.verbose,
use_cforch=True, cforch_url=args.cforch_url,
)
else:
with print_lock:
print(f"\nRunning [{mk}] → {direct_base} model={model_name}")
results = run_benchmark(
mk, model_name, prompts=prompts, verbose=args.verbose,
api_base=direct_base,
)
with print_lock:
_print_single_report(results, mk)
return mk, results
workers = max(1, args.workers)
if workers == 1 or len(model_keys) == 1:
for mk in model_keys:
mk_out, results = _run_one(mk)
all_results[mk_out] = results
else:
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(_run_one, mk): mk for mk in model_keys}
for fut in as_completed(futures):
mk_out, results = fut.result()
all_results[mk_out] = results
if len(model_keys) > 1:
_print_comparison_table(all_results)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
payload = {
mk: [asdict(r) for r in results]
for mk, results in all_results.items()
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, ensure_ascii=False)
print(f"Wrote detailed results to {args.output}")
if __name__ == "__main__":
main()