feat(avocet): add /api/finetune/status and /api/finetune/run endpoints
This commit is contained in:
parent
64fd19a7b6
commit
ef8adfb035
2 changed files with 173 additions and 0 deletions
69
app/api.py
69
app/api.py
|
|
@ -340,6 +340,75 @@ def run_benchmark(include_slow: bool = False):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Finetune endpoints
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@app.get("/api/finetune/status")
|
||||||
|
def get_finetune_status():
|
||||||
|
"""Scan models/ for training_info.json files. Returns [] if none exist."""
|
||||||
|
models_dir = _ROOT / "models"
|
||||||
|
if not models_dir.exists():
|
||||||
|
return []
|
||||||
|
results = []
|
||||||
|
for sub in models_dir.iterdir():
|
||||||
|
if not sub.is_dir():
|
||||||
|
continue
|
||||||
|
info_path = sub / "training_info.json"
|
||||||
|
if not info_path.exists():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
info = json.loads(info_path.read_text(encoding="utf-8"))
|
||||||
|
results.append(info)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/finetune/run")
|
||||||
|
def run_finetune_endpoint(
|
||||||
|
model: str = "deberta-small",
|
||||||
|
epochs: int = 5,
|
||||||
|
score: list[str] = Query(default=[]),
|
||||||
|
):
|
||||||
|
"""Spawn finetune_classifier.py and stream stdout as SSE progress events."""
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
python_bin = "/devl/miniconda3/envs/job-seeker-classifiers/bin/python"
|
||||||
|
script = str(_ROOT / "scripts" / "finetune_classifier.py")
|
||||||
|
cmd = [python_bin, script, "--model", model, "--epochs", str(epochs)]
|
||||||
|
for score_file in score:
|
||||||
|
cmd.extend(["--score", score_file])
|
||||||
|
|
||||||
|
def generate():
|
||||||
|
try:
|
||||||
|
proc = subprocess.Popen(
|
||||||
|
cmd,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.STDOUT,
|
||||||
|
text=True,
|
||||||
|
bufsize=1,
|
||||||
|
cwd=str(_ROOT),
|
||||||
|
)
|
||||||
|
for line in proc.stdout:
|
||||||
|
line = line.rstrip()
|
||||||
|
if line:
|
||||||
|
yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n"
|
||||||
|
proc.wait()
|
||||||
|
if proc.returncode == 0:
|
||||||
|
yield f"data: {json.dumps({'type': 'complete'})}\n\n"
|
||||||
|
else:
|
||||||
|
yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n"
|
||||||
|
except Exception as exc:
|
||||||
|
yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
generate(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/fetch/stream")
|
@app.get("/api/fetch/stream")
|
||||||
def fetch_stream(
|
def fetch_stream(
|
||||||
accounts: str = Query(default=""),
|
accounts: str = Query(default=""),
|
||||||
|
|
|
||||||
|
|
@ -325,3 +325,107 @@ def test_fetch_stream_with_mock_imap(client, config_dir, data_dir):
|
||||||
assert "start" in types
|
assert "start" in types
|
||||||
assert "done" in types
|
assert "done" in types
|
||||||
assert "complete" in types
|
assert "complete" in types
|
||||||
|
|
||||||
|
|
||||||
|
# ---- /api/finetune/status tests ----
|
||||||
|
|
||||||
|
def test_finetune_status_returns_empty_when_no_models_dir(client):
|
||||||
|
"""GET /api/finetune/status must return [] if models/ does not exist."""
|
||||||
|
r = client.get("/api/finetune/status")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json() == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_finetune_status_returns_training_info(client):
|
||||||
|
"""GET /api/finetune/status must return one entry per training_info.json found."""
|
||||||
|
import json as _json
|
||||||
|
from app import api as api_module
|
||||||
|
|
||||||
|
models_dir = api_module._ROOT / "models" / "avocet-deberta-small-test"
|
||||||
|
models_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
info = {
|
||||||
|
"name": "avocet-deberta-small",
|
||||||
|
"base_model_id": "cross-encoder/nli-deberta-v3-small",
|
||||||
|
"val_macro_f1": 0.712,
|
||||||
|
"timestamp": "2026-03-15T12:00:00Z",
|
||||||
|
"sample_count": 401,
|
||||||
|
}
|
||||||
|
(models_dir / "training_info.json").write_text(_json.dumps(info))
|
||||||
|
|
||||||
|
try:
|
||||||
|
r = client.get("/api/finetune/status")
|
||||||
|
assert r.status_code == 200
|
||||||
|
data = r.json()
|
||||||
|
assert any(d["name"] == "avocet-deberta-small" for d in data)
|
||||||
|
finally:
|
||||||
|
import shutil
|
||||||
|
shutil.rmtree(models_dir)
|
||||||
|
|
||||||
|
|
||||||
|
def test_finetune_run_streams_sse_events(client):
|
||||||
|
"""GET /api/finetune/run must return text/event-stream content type."""
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
mock_proc = MagicMock()
|
||||||
|
mock_proc.stdout = iter(["Training epoch 1\n", "Done\n"])
|
||||||
|
mock_proc.returncode = 0
|
||||||
|
mock_proc.wait = MagicMock()
|
||||||
|
|
||||||
|
with patch("subprocess.Popen", return_value=mock_proc):
|
||||||
|
r = client.get("/api/finetune/run?model=deberta-small&epochs=1")
|
||||||
|
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert "text/event-stream" in r.headers.get("content-type", "")
|
||||||
|
|
||||||
|
|
||||||
|
def test_finetune_run_emits_complete_on_success(client):
|
||||||
|
"""GET /api/finetune/run must emit a complete event on clean exit."""
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
mock_proc = MagicMock()
|
||||||
|
mock_proc.stdout = iter(["progress line\n"])
|
||||||
|
mock_proc.returncode = 0
|
||||||
|
mock_proc.wait = MagicMock()
|
||||||
|
|
||||||
|
with patch("subprocess.Popen", return_value=mock_proc):
|
||||||
|
r = client.get("/api/finetune/run?model=deberta-small&epochs=1")
|
||||||
|
|
||||||
|
assert '{"type": "complete"}' in r.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_finetune_run_emits_error_on_nonzero_exit(client):
|
||||||
|
"""GET /api/finetune/run must emit an error event on non-zero exit."""
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
mock_proc = MagicMock()
|
||||||
|
mock_proc.stdout = iter([])
|
||||||
|
mock_proc.returncode = 1
|
||||||
|
mock_proc.wait = MagicMock()
|
||||||
|
|
||||||
|
with patch("subprocess.Popen", return_value=mock_proc):
|
||||||
|
r = client.get("/api/finetune/run?model=deberta-small&epochs=1")
|
||||||
|
|
||||||
|
assert '"type": "error"' in r.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_finetune_run_passes_score_files_to_subprocess(client):
|
||||||
|
"""GET /api/finetune/run?score=file1&score=file2 must pass --score args to subprocess."""
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
captured_cmd = []
|
||||||
|
|
||||||
|
def mock_popen(cmd, **kwargs):
|
||||||
|
captured_cmd.extend(cmd)
|
||||||
|
m = MagicMock()
|
||||||
|
m.stdout = iter([])
|
||||||
|
m.returncode = 0
|
||||||
|
m.wait = MagicMock()
|
||||||
|
return m
|
||||||
|
|
||||||
|
with patch("subprocess.Popen", side_effect=mock_popen):
|
||||||
|
client.get("/api/finetune/run?model=deberta-small&epochs=1&score=run1.jsonl&score=run2.jsonl")
|
||||||
|
|
||||||
|
assert "--score" in captured_cmd
|
||||||
|
assert captured_cmd.count("--score") == 2
|
||||||
|
assert "run1.jsonl" in captured_cmd
|
||||||
|
assert "run2.jsonl" in captured_cmd
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue