avocet/app/dashboard.py

191 lines
6.9 KiB
Python

"""Avocet -- dashboard aggregate API.
GET /api/dashboard returns the current flywheel state:
labeled_since_last_eval -- items labeled after the most recent eval run
last_eval_timestamp -- ISO timestamp of newest bench_results summary
last_eval_best_score -- best macro_f1 from that summary
active_jobs -- jobs with status queued or running
corrections_pending -- sft_candidates with status=needs_review
corrections_export_ready -- approved sft candidates with non-blank correction
signals -- computed booleans for UI nudge indicators
Thresholds in label_tool.yaml pipeline: section:
pipeline:
data_eval_threshold: 50 # labeled items since last eval to trigger nudge
eval_train_threshold: 0.05 # improvement delta needed before retraining (future)
"""
from __future__ import annotations
import json
import logging
import yaml
from pathlib import Path
from fastapi import APIRouter
logger = logging.getLogger(__name__)
_ROOT = Path(__file__).parent.parent
_DATA_DIR: Path = _ROOT / "data"
_CONFIG_DIR: Path | None = None
router = APIRouter()
_DEFAULT_DATA_EVAL_THRESHOLD = 50
_DEFAULT_EVAL_TRAIN_THRESHOLD = 0.05
def set_data_dir(path: Path) -> None:
global _DATA_DIR
_DATA_DIR = path
def set_config_dir(path: Path | None) -> None:
global _CONFIG_DIR
_CONFIG_DIR = path
def _config_file() -> Path:
if _CONFIG_DIR is not None:
return _CONFIG_DIR / "label_tool.yaml"
return _ROOT / "config" / "label_tool.yaml"
def _load_thresholds() -> tuple[int, float]:
f = _config_file()
if f.exists():
try:
raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {}
pipeline = raw.get("pipeline", {}) or {}
return (
int(pipeline.get("data_eval_threshold", _DEFAULT_DATA_EVAL_THRESHOLD)),
float(pipeline.get("eval_train_threshold", _DEFAULT_EVAL_TRAIN_THRESHOLD)),
)
except Exception as exc:
logger.warning("Failed to read pipeline thresholds: %s", exc)
return _DEFAULT_DATA_EVAL_THRESHOLD, _DEFAULT_EVAL_TRAIN_THRESHOLD
def _load_score_records() -> list[dict]:
path = _DATA_DIR / "email_score.jsonl"
if not path.exists():
return []
records = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except json.JSONDecodeError:
pass
return records
def _find_latest_eval(results_dir_override: str = "") -> tuple[str | None, float | None]:
"""Return (iso_timestamp, best_macro_f1) from the newest bench_results summary.
Checks results_dir from cforch config if set, then falls back to
_ROOT/bench_results/. Returns (None, None) if no results exist.
"""
candidates = []
if results_dir_override:
candidates.append(Path(results_dir_override))
else:
f = _config_file()
if f.exists():
try:
raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {}
rd = (raw.get("cforch", {}) or {}).get("results_dir", "")
if rd:
candidates.append(Path(rd))
except Exception:
pass
candidates.append(_ROOT / "bench_results")
for rdir in candidates:
if not rdir.exists():
continue
subdirs = sorted([d for d in rdir.iterdir() if d.is_dir()], key=lambda d: d.name)
for subdir in reversed(subdirs):
summary = subdir / "summary.json"
if summary.exists():
try:
data = json.loads(summary.read_text(encoding="utf-8"))
ts = data.get("timestamp") or subdir.name
score = data.get("best_macro_f1") or data.get("macro_f1")
return ts, (float(score) if isinstance(score, (int, float)) else None)
except Exception:
pass
return None, None
def _count_corrections() -> tuple[int, int]:
"""Return (pending_count, export_ready_count)."""
pending = 0
export_ready = 0
candidates_path = _DATA_DIR / "sft_candidates.jsonl"
approved_path = _DATA_DIR / "sft_approved.jsonl"
if candidates_path.exists():
for line in candidates_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
r = json.loads(line)
if r.get("status") == "needs_review":
pending += 1
except json.JSONDecodeError:
pass
if approved_path.exists():
for line in approved_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
r = json.loads(line)
if (r.get("status") == "approved"
and r.get("corrected_response")
and str(r["corrected_response"]).strip()):
export_ready += 1
except json.JSONDecodeError:
pass
return pending, export_ready
def _get_active_jobs() -> list[dict]:
"""Query train SQLite DB for queued/running jobs. Returns [] if DB absent."""
try:
from app.train.train import _DB_PATH, _db, _init_db
if not _DB_PATH.exists():
return []
_init_db()
with _db() as conn:
rows = conn.execute(
"SELECT id, type, status FROM jobs WHERE status IN ('queued', 'running')"
).fetchall()
return [{"id": r["id"], "type": r["type"], "status": r["status"]} for r in rows]
except Exception as exc:
logger.warning("Failed to query train jobs DB: %s", exc)
return []
def _count_labeled_since(since_ts: str | None) -> int:
records = _load_score_records()
if since_ts is None:
return len(records)
return sum(1 for r in records if r.get("labeled_at", "") > since_ts)
@router.get("/dashboard")
def get_dashboard() -> dict:
data_eval_threshold, eval_train_threshold = _load_thresholds()
last_eval_ts, last_eval_score = _find_latest_eval()
labeled_since = _count_labeled_since(last_eval_ts)
corrections_pending, corrections_export_ready = _count_corrections()
active_jobs = _get_active_jobs()
return {
"labeled_since_last_eval": labeled_since,
"last_eval_timestamp": last_eval_ts,
"last_eval_best_score": last_eval_score,
"active_jobs": active_jobs,
"corrections_pending": corrections_pending,
"corrections_export_ready": corrections_export_ready,
"signals": {
"data_to_eval": labeled_since >= data_eval_threshold,
"eval_to_train": False, # future: implement delta-F1 comparison
"train_to_fleet": False, # future: implement fleet sync signal
},
}