From aa742bcfc05852e5022f096eb098040cc707398c Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 1 May 2026 23:30:04 -0700 Subject: [PATCH] feat: add GET /api/dashboard flywheel aggregate endpoint --- app/api.py | 3 + app/dashboard.py | 191 ++++++++++++++++++++++++++++++++++++++++ tests/test_dashboard.py | 122 +++++++++++++++++++++++++ 3 files changed, 316 insertions(+) create mode 100644 app/dashboard.py create mode 100644 tests/test_dashboard.py diff --git a/app/api.py b/app/api.py index 2930571..e439277 100644 --- a/app/api.py +++ b/app/api.py @@ -126,6 +126,9 @@ app.include_router(fetch_router, prefix="/api") from app.train.train import router as train_router app.include_router(train_router, prefix="/api/train") +from app.dashboard import router as dashboard_router +app.include_router(dashboard_router, prefix="/api") + # Static SPA — MUST be last (catches all unmatched paths) _DIST = _ROOT / "web" / "dist" diff --git a/app/dashboard.py b/app/dashboard.py new file mode 100644 index 0000000..95efefa --- /dev/null +++ b/app/dashboard.py @@ -0,0 +1,191 @@ +"""Avocet -- dashboard aggregate API. + +GET /api/dashboard returns the current flywheel state: + labeled_since_last_eval -- items labeled after the most recent eval run + last_eval_timestamp -- ISO timestamp of newest bench_results summary + last_eval_best_score -- best macro_f1 from that summary + active_jobs -- jobs with status queued or running + corrections_pending -- sft_candidates with status=needs_review + corrections_export_ready -- approved sft candidates with non-blank correction + signals -- computed booleans for UI nudge indicators + +Thresholds in label_tool.yaml pipeline: section: + pipeline: + data_eval_threshold: 50 # labeled items since last eval to trigger nudge + eval_train_threshold: 0.05 # improvement delta needed before retraining (future) +""" +from __future__ import annotations + +import json +import logging +import yaml +from pathlib import Path + +from fastapi import APIRouter + +logger = logging.getLogger(__name__) + +_ROOT = Path(__file__).parent.parent +_DATA_DIR: Path = _ROOT / "data" +_CONFIG_DIR: Path | None = None + +router = APIRouter() + +_DEFAULT_DATA_EVAL_THRESHOLD = 50 +_DEFAULT_EVAL_TRAIN_THRESHOLD = 0.05 + + +def set_data_dir(path: Path) -> None: + global _DATA_DIR + _DATA_DIR = path + +def set_config_dir(path: Path | None) -> None: + global _CONFIG_DIR + _CONFIG_DIR = path + +def _config_file() -> Path: + if _CONFIG_DIR is not None: + return _CONFIG_DIR / "label_tool.yaml" + return _ROOT / "config" / "label_tool.yaml" + +def _load_thresholds() -> tuple[int, float]: + f = _config_file() + if f.exists(): + try: + raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} + pipeline = raw.get("pipeline", {}) or {} + return ( + int(pipeline.get("data_eval_threshold", _DEFAULT_DATA_EVAL_THRESHOLD)), + float(pipeline.get("eval_train_threshold", _DEFAULT_EVAL_TRAIN_THRESHOLD)), + ) + except Exception as exc: + logger.warning("Failed to read pipeline thresholds: %s", exc) + return _DEFAULT_DATA_EVAL_THRESHOLD, _DEFAULT_EVAL_TRAIN_THRESHOLD + +def _load_score_records() -> list[dict]: + path = _DATA_DIR / "email_score.jsonl" + if not path.exists(): + return [] + records = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + try: + records.append(json.loads(line)) + except json.JSONDecodeError: + pass + return records + +def _find_latest_eval(results_dir_override: str = "") -> tuple[str | None, float | None]: + """Return (iso_timestamp, best_macro_f1) from the newest bench_results summary. + + Checks results_dir from cforch config if set, then falls back to + _ROOT/bench_results/. Returns (None, None) if no results exist. + """ + candidates = [] + if results_dir_override: + candidates.append(Path(results_dir_override)) + else: + f = _config_file() + if f.exists(): + try: + raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} + rd = (raw.get("cforch", {}) or {}).get("results_dir", "") + if rd: + candidates.append(Path(rd)) + except Exception: + pass + candidates.append(_ROOT / "bench_results") + + for rdir in candidates: + if not rdir.exists(): + continue + subdirs = sorted([d for d in rdir.iterdir() if d.is_dir()], key=lambda d: d.name) + for subdir in reversed(subdirs): + summary = subdir / "summary.json" + if summary.exists(): + try: + data = json.loads(summary.read_text(encoding="utf-8")) + ts = data.get("timestamp") or subdir.name + score = data.get("best_macro_f1") or data.get("macro_f1") + return ts, (float(score) if isinstance(score, (int, float)) else None) + except Exception: + pass + return None, None + +def _count_corrections() -> tuple[int, int]: + """Return (pending_count, export_ready_count).""" + pending = 0 + export_ready = 0 + candidates_path = _DATA_DIR / "sft_candidates.jsonl" + approved_path = _DATA_DIR / "sft_approved.jsonl" + if candidates_path.exists(): + for line in candidates_path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + try: + r = json.loads(line) + if r.get("status") == "needs_review": + pending += 1 + except json.JSONDecodeError: + pass + if approved_path.exists(): + for line in approved_path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + try: + r = json.loads(line) + if (r.get("status") == "approved" + and r.get("corrected_response") + and str(r["corrected_response"]).strip()): + export_ready += 1 + except json.JSONDecodeError: + pass + return pending, export_ready + +def _get_active_jobs() -> list[dict]: + """Query train SQLite DB for queued/running jobs. Returns [] if DB absent.""" + try: + from app.train.train import _DB_PATH, _db, _init_db + if not _DB_PATH.exists(): + return [] + _init_db() + with _db() as conn: + rows = conn.execute( + "SELECT id, type, status FROM jobs WHERE status IN ('queued', 'running')" + ).fetchall() + return [{"id": r["id"], "type": r["type"], "status": r["status"]} for r in rows] + except Exception as exc: + logger.warning("Failed to query train jobs DB: %s", exc) + return [] + +def _count_labeled_since(since_ts: str | None) -> int: + records = _load_score_records() + if since_ts is None: + return len(records) + return sum(1 for r in records if r.get("labeled_at", "") > since_ts) + + +@router.get("/dashboard") +def get_dashboard() -> dict: + data_eval_threshold, eval_train_threshold = _load_thresholds() + last_eval_ts, last_eval_score = _find_latest_eval() + labeled_since = _count_labeled_since(last_eval_ts) + corrections_pending, corrections_export_ready = _count_corrections() + active_jobs = _get_active_jobs() + return { + "labeled_since_last_eval": labeled_since, + "last_eval_timestamp": last_eval_ts, + "last_eval_best_score": last_eval_score, + "active_jobs": active_jobs, + "corrections_pending": corrections_pending, + "corrections_export_ready": corrections_export_ready, + "signals": { + "data_to_eval": labeled_since >= data_eval_threshold, + "eval_to_train": False, # future: implement delta-F1 comparison + "train_to_fleet": False, # future: implement fleet sync signal + }, + } diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py new file mode 100644 index 0000000..e5b5da2 --- /dev/null +++ b/tests/test_dashboard.py @@ -0,0 +1,122 @@ +"""Tests for app/dashboard.py -- GET /api/dashboard.""" +import json +import pytest +import yaml +from fastapi.testclient import TestClient +from pathlib import Path + + +@pytest.fixture(autouse=True) +def reset_globals(tmp_path): + from app import dashboard as dash_module + dash_module.set_data_dir(tmp_path) + dash_module.set_config_dir(tmp_path) + yield + + +@pytest.fixture +def client(): + from app.api import app + return TestClient(app) + + +def _write_score(tmp_path: Path, records: list[dict]) -> None: + (tmp_path / "email_score.jsonl").write_text( + "\n".join(json.dumps(r) for r in records) + "\n" + ) + +def _write_summary(tmp_path: Path, run_id: str, ts: str, score: float) -> None: + run_dir = tmp_path / "bench_results" / run_id + run_dir.mkdir(parents=True) + (run_dir / "summary.json").write_text( + json.dumps({"timestamp": ts, "best_macro_f1": score}) + ) + + +def test_dashboard_returns_expected_keys(client): + r = client.get("/api/dashboard") + assert r.status_code == 200 + data = r.json() + for key in ("labeled_since_last_eval", "last_eval_timestamp", "last_eval_best_score", + "active_jobs", "corrections_pending", "corrections_export_ready", "signals"): + assert key in data, f"missing key: {key}" + for sig in ("data_to_eval", "eval_to_train", "train_to_fleet"): + assert sig in data["signals"], f"missing signal: {sig}" + + +def test_dashboard_empty_state(client): + r = client.get("/api/dashboard") + assert r.status_code == 200 + data = r.json() + assert data["labeled_since_last_eval"] == 0 + assert data["last_eval_timestamp"] is None + assert data["last_eval_best_score"] is None + assert data["active_jobs"] == [] + assert data["corrections_pending"] == 0 + assert data["corrections_export_ready"] == 0 + + +def test_labeled_since_counts_all_when_no_eval(client, tmp_path): + _write_score(tmp_path, [ + {"id": "a", "label": "neutral", "labeled_at": "2026-05-01T10:00:00+00:00"}, + {"id": "b", "label": "neutral", "labeled_at": "2026-05-01T11:00:00+00:00"}, + ]) + r = client.get("/api/dashboard") + assert r.json()["labeled_since_last_eval"] == 2 + + +def test_labeled_since_filters_by_eval_timestamp(client, tmp_path): + _write_summary(tmp_path, "2026-05-01-100000", "2026-05-01T10:00:00+00:00", 0.80) + _write_score(tmp_path, [ + {"id": "a", "label": "neutral", "labeled_at": "2026-05-01T09:00:00+00:00"}, + {"id": "b", "label": "neutral", "labeled_at": "2026-05-01T11:00:00+00:00"}, + ]) + (tmp_path / "label_tool.yaml").write_text( + yaml.dump({"cforch": {"results_dir": str(tmp_path / "bench_results")}}) + ) + r = client.get("/api/dashboard") + data = r.json() + assert data["labeled_since_last_eval"] == 1 + assert abs(data["last_eval_best_score"] - 0.80) < 0.001 + + +def test_data_to_eval_false_below_threshold(client, tmp_path): + _write_score(tmp_path, [{"id": str(i), "label": "neutral", + "labeled_at": "2026-05-01T10:00:00+00:00"} for i in range(10)]) + (tmp_path / "label_tool.yaml").write_text(yaml.dump({"pipeline": {"data_eval_threshold": 50}})) + r = client.get("/api/dashboard") + assert r.json()["signals"]["data_to_eval"] is False + + +def test_data_to_eval_true_at_threshold(client, tmp_path): + _write_score(tmp_path, [{"id": str(i), "label": "neutral", + "labeled_at": "2026-05-01T10:00:00+00:00"} for i in range(50)]) + (tmp_path / "label_tool.yaml").write_text(yaml.dump({"pipeline": {"data_eval_threshold": 50}})) + r = client.get("/api/dashboard") + assert r.json()["signals"]["data_to_eval"] is True + + +def test_corrections_pending_count(client, tmp_path): + candidates = [ + {"id": "c1", "status": "needs_review"}, + {"id": "c2", "status": "needs_review"}, + {"id": "c3", "status": "discarded"}, + ] + (tmp_path / "sft_candidates.jsonl").write_text( + "\n".join(json.dumps(c) for c in candidates) + "\n" + ) + r = client.get("/api/dashboard") + assert r.json()["corrections_pending"] == 2 + + +def test_corrections_export_ready_count(client, tmp_path): + approved = [ + {"id": "a1", "status": "approved", "corrected_response": "Good answer"}, + {"id": "a2", "status": "approved", "corrected_response": ""}, + {"id": "a3", "status": "approved", "corrected_response": "Another answer"}, + ] + (tmp_path / "sft_approved.jsonl").write_text( + "\n".join(json.dumps(a) for a in approved) + "\n" + ) + r = client.get("/api/dashboard") + assert r.json()["corrections_export_ready"] == 2