From 30f19711ec38d6bd3b4b830318252e5f9bfdc59b Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Sun, 15 Mar 2026 18:09:20 -0700 Subject: [PATCH] feat(avocet): add cancel endpoints for benchmark and finetune jobs Adds POST /api/benchmark/cancel and POST /api/finetune/cancel endpoints that terminate the running subprocess (kill on 3s timeout), and updates the run generators to emit a cancelled SSE event instead of error when the job was intentionally stopped. --- app/api.py | 85 +++++++++++++++++++++++++------- tests/test_api.py | 120 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+), 18 deletions(-) diff --git a/app/api.py b/app/api.py index 83490b5..c12da43 100644 --- a/app/api.py +++ b/app/api.py @@ -22,6 +22,11 @@ _DATA_DIR: Path = _ROOT / "data" # overridable in tests via set_data_dir() _MODELS_DIR: Path = _ROOT / "models" # overridable in tests via set_models_dir() _CONFIG_DIR: Path | None = None # None = use real path +# Process registry for running jobs — used by cancel endpoints. +# Keys: "benchmark" | "finetune". Values: the live Popen object. +_running_procs: dict = {} +_cancelled_jobs: set = set() + def set_data_dir(path: Path) -> None: """Override data directory — used by tests.""" @@ -358,15 +363,22 @@ def run_benchmark(include_slow: bool = False): bufsize=1, cwd=str(_ROOT), ) - for line in proc.stdout: - line = line.rstrip() - if line: - yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n" - proc.wait() - if proc.returncode == 0: - yield f"data: {json.dumps({'type': 'complete'})}\n\n" - else: - yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n" + _running_procs["benchmark"] = proc + try: + for line in proc.stdout: + line = line.rstrip() + if line: + yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n" + proc.wait() + if proc.returncode == 0: + yield f"data: {json.dumps({'type': 'complete'})}\n\n" + elif "benchmark" in _cancelled_jobs: + _cancelled_jobs.discard("benchmark") + yield f"data: {json.dumps({'type': 'cancelled'})}\n\n" + else: + yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n" + finally: + _running_procs.pop("benchmark", None) except Exception as exc: yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n" @@ -443,15 +455,22 @@ def run_finetune_endpoint( cwd=str(_ROOT), env=proc_env, ) - for line in proc.stdout: - line = line.rstrip() - if line: - yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n" - proc.wait() - if proc.returncode == 0: - yield f"data: {json.dumps({'type': 'complete'})}\n\n" - else: - yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n" + _running_procs["finetune"] = proc + try: + for line in proc.stdout: + line = line.rstrip() + if line: + yield f"data: {json.dumps({'type': 'progress', 'message': line})}\n\n" + proc.wait() + if proc.returncode == 0: + yield f"data: {json.dumps({'type': 'complete'})}\n\n" + elif "finetune" in _cancelled_jobs: + _cancelled_jobs.discard("finetune") + yield f"data: {json.dumps({'type': 'cancelled'})}\n\n" + else: + yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n" + finally: + _running_procs.pop("finetune", None) except Exception as exc: yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n" @@ -462,6 +481,36 @@ def run_finetune_endpoint( ) +@app.post("/api/benchmark/cancel") +def cancel_benchmark(): + """Kill the running benchmark subprocess. 404 if none is running.""" + proc = _running_procs.get("benchmark") + if proc is None: + raise HTTPException(404, "No benchmark is running") + _cancelled_jobs.add("benchmark") + proc.terminate() + try: + proc.wait(timeout=3) + except Exception: + proc.kill() + return {"status": "cancelled"} + + +@app.post("/api/finetune/cancel") +def cancel_finetune(): + """Kill the running fine-tune subprocess. 404 if none is running.""" + proc = _running_procs.get("finetune") + if proc is None: + raise HTTPException(404, "No finetune is running") + _cancelled_jobs.add("finetune") + proc.terminate() + try: + proc.wait(timeout=3) + except Exception: + proc.kill() + return {"status": "cancelled"} + + @app.get("/api/fetch/stream") def fetch_stream( accounts: str = Query(default=""), diff --git a/tests/test_api.py b/tests/test_api.py index 4f960b5..165ebc6 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -430,3 +430,123 @@ def test_finetune_run_passes_score_files_to_subprocess(client): # Paths are resolved to absolute — check filenames are present as substrings assert any("run1.jsonl" in arg for arg in captured_cmd) assert any("run2.jsonl" in arg for arg in captured_cmd) + + +# ---- Cancel endpoint tests ---- + +def test_benchmark_cancel_returns_404_when_not_running(client): + """POST /api/benchmark/cancel must return 404 if no benchmark is running.""" + from app import api as api_module + api_module._running_procs.pop("benchmark", None) + r = client.post("/api/benchmark/cancel") + assert r.status_code == 404 + + +def test_finetune_cancel_returns_404_when_not_running(client): + """POST /api/finetune/cancel must return 404 if no finetune is running.""" + from app import api as api_module + api_module._running_procs.pop("finetune", None) + r = client.post("/api/finetune/cancel") + assert r.status_code == 404 + + +def test_benchmark_cancel_terminates_running_process(client): + """POST /api/benchmark/cancel must call terminate() on the running process.""" + from unittest.mock import MagicMock + from app import api as api_module + + mock_proc = MagicMock() + mock_proc.wait = MagicMock() + api_module._running_procs["benchmark"] = mock_proc + + try: + r = client.post("/api/benchmark/cancel") + assert r.status_code == 200 + assert r.json()["status"] == "cancelled" + mock_proc.terminate.assert_called_once() + finally: + api_module._running_procs.pop("benchmark", None) + api_module._cancelled_jobs.discard("benchmark") + + +def test_finetune_cancel_terminates_running_process(client): + """POST /api/finetune/cancel must call terminate() on the running process.""" + from unittest.mock import MagicMock + from app import api as api_module + + mock_proc = MagicMock() + mock_proc.wait = MagicMock() + api_module._running_procs["finetune"] = mock_proc + + try: + r = client.post("/api/finetune/cancel") + assert r.status_code == 200 + assert r.json()["status"] == "cancelled" + mock_proc.terminate.assert_called_once() + finally: + api_module._running_procs.pop("finetune", None) + api_module._cancelled_jobs.discard("finetune") + + +def test_benchmark_cancel_kills_process_on_timeout(client): + """POST /api/benchmark/cancel must call kill() if the process does not exit within 3 s.""" + import subprocess + from unittest.mock import MagicMock + from app import api as api_module + + mock_proc = MagicMock() + mock_proc.wait.side_effect = subprocess.TimeoutExpired(cmd="benchmark", timeout=3) + api_module._running_procs["benchmark"] = mock_proc + + try: + r = client.post("/api/benchmark/cancel") + assert r.status_code == 200 + mock_proc.kill.assert_called_once() + finally: + api_module._running_procs.pop("benchmark", None) + api_module._cancelled_jobs.discard("benchmark") + + +def test_finetune_run_emits_cancelled_event(client): + """GET /api/finetune/run must emit cancelled (not error) when job was cancelled.""" + from unittest.mock import patch, MagicMock + from app import api as api_module + + mock_proc = MagicMock() + mock_proc.stdout = iter([]) + mock_proc.returncode = -15 # SIGTERM + + def mock_popen(cmd, **kwargs): + # Simulate cancel being called after process starts + api_module._cancelled_jobs.add("finetune") + return mock_proc + + try: + with patch("subprocess.Popen", side_effect=mock_popen): + r = client.get("/api/finetune/run?model=deberta-small&epochs=1") + assert '{"type": "cancelled"}' in r.text + assert '"type": "error"' not in r.text + finally: + api_module._cancelled_jobs.discard("finetune") + + +def test_benchmark_run_emits_cancelled_event(client): + """GET /api/benchmark/run must emit cancelled (not error) when job was cancelled.""" + from unittest.mock import patch, MagicMock + from app import api as api_module + + mock_proc = MagicMock() + mock_proc.stdout = iter([]) + mock_proc.returncode = -15 + + def mock_popen(cmd, **kwargs): + api_module._cancelled_jobs.add("benchmark") + return mock_proc + + try: + with patch("subprocess.Popen", side_effect=mock_popen): + r = client.get("/api/benchmark/run") + assert '{"type": "cancelled"}' in r.text + assert '"type": "error"' not in r.text + finally: + api_module._cancelled_jobs.discard("benchmark")