feat: sft_import.py — run discovery and JSONL deduplication

This commit is contained in:
pyr0ball 2026-04-08 07:13:37 -07:00
parent 25880e377d
commit 03dac57fd9
2 changed files with 177 additions and 0 deletions

88
scripts/sft_import.py Normal file
View file

@ -0,0 +1,88 @@
"""Avocet — SFT candidate run discovery and JSONL import.
No FastAPI dependency pure Python file operations.
Used by app/sft.py endpoints and can be run standalone.
"""
from __future__ import annotations
import json
from pathlib import Path
_CANDIDATES_FILENAME = "sft_candidates.jsonl"
def discover_runs(bench_results_dir: Path) -> list[dict]:
"""Return one entry per run subdirectory that contains sft_candidates.jsonl.
Sorted newest-first by directory name (directories are named YYYY-MM-DD-HHMMSS
by the cf-orch benchmark harness, so lexicographic order is chronological).
Each entry: {run_id, timestamp, candidate_count, sft_path}
"""
if not bench_results_dir.exists() or not bench_results_dir.is_dir():
return []
runs = []
for subdir in bench_results_dir.iterdir():
if not subdir.is_dir():
continue
sft_path = subdir / _CANDIDATES_FILENAME
if not sft_path.exists():
continue
records = _read_jsonl(sft_path)
runs.append({
"run_id": subdir.name,
"timestamp": subdir.name,
"candidate_count": len(records),
"sft_path": sft_path,
})
runs.sort(key=lambda r: r["run_id"], reverse=True)
return runs
def import_run(sft_path: Path, data_dir: Path) -> dict[str, int]:
"""Append records from sft_path into data_dir/sft_candidates.jsonl.
Deduplicates on the `id` field records whose id already exists in the
destination file are skipped silently. Records missing an `id` field are
also skipped (malformed input from a partial benchmark write).
Returns {imported: N, skipped: M}.
"""
dest = data_dir / _CANDIDATES_FILENAME
existing = _read_jsonl(dest)
existing_ids = {r["id"] for r in existing if "id" in r}
new_records: list[dict] = []
skipped = 0
for record in _read_jsonl(sft_path):
if "id" not in record:
continue # malformed — skip without crashing
if record["id"] in existing_ids:
skipped += 1
continue
new_records.append(record)
existing_ids.add(record["id"])
if new_records:
dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "a", encoding="utf-8") as fh:
for r in new_records:
fh.write(json.dumps(r) + "\n")
return {"imported": len(new_records), "skipped": skipped}
def _read_jsonl(path: Path) -> list[dict]:
"""Read a JSONL file, returning valid records. Skips blank lines and malformed JSON."""
if not path.exists():
return []
records: list[dict] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except json.JSONDecodeError:
pass
return records

89
tests/test_sft_import.py Normal file
View file

@ -0,0 +1,89 @@
"""Unit tests for scripts/sft_import.py — run discovery and JSONL deduplication."""
import json
import pytest
from pathlib import Path
def _write_candidates(path: Path, records: list[dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(json.dumps(r) for r in records) + "\n", encoding="utf-8")
def _make_record(id: str, run_id: str = "run1") -> dict:
return {
"id": id, "source": "cf-orch-benchmark",
"benchmark_run_id": run_id, "timestamp": "2026-04-07T10:00:00Z",
"status": "needs_review", "prompt_messages": [],
"model_response": "bad", "corrected_response": None,
"quality_score": 0.3, "failure_reason": "missing patterns",
"task_id": "code-fn", "task_type": "code", "task_name": "Code: fn",
"model_id": "Qwen/Qwen2.5-3B", "model_name": "Qwen2.5-3B",
"node_id": "heimdall", "gpu_id": 0, "tokens_per_sec": 38.4,
}
def test_discover_runs_empty_when_dir_missing(tmp_path):
from scripts.sft_import import discover_runs
result = discover_runs(tmp_path / "nonexistent")
assert result == []
def test_discover_runs_returns_runs(tmp_path):
from scripts.sft_import import discover_runs
run_dir = tmp_path / "2026-04-07-143022"
_write_candidates(run_dir / "sft_candidates.jsonl", [_make_record("a"), _make_record("b")])
result = discover_runs(tmp_path)
assert len(result) == 1
assert result[0]["run_id"] == "2026-04-07-143022"
assert result[0]["candidate_count"] == 2
assert "sft_path" in result[0]
def test_discover_runs_skips_dirs_without_sft_file(tmp_path):
from scripts.sft_import import discover_runs
(tmp_path / "2026-04-07-no-sft").mkdir()
result = discover_runs(tmp_path)
assert result == []
def test_discover_runs_sorted_newest_first(tmp_path):
from scripts.sft_import import discover_runs
for name in ["2026-04-05-120000", "2026-04-07-143022", "2026-04-06-090000"]:
run_dir = tmp_path / name
_write_candidates(run_dir / "sft_candidates.jsonl", [_make_record("x")])
result = discover_runs(tmp_path)
assert [r["run_id"] for r in result] == [
"2026-04-07-143022", "2026-04-06-090000", "2026-04-05-120000"
]
def test_import_run_imports_new_records(tmp_path):
from scripts.sft_import import import_run
sft_path = tmp_path / "run1" / "sft_candidates.jsonl"
_write_candidates(sft_path, [_make_record("a"), _make_record("b")])
result = import_run(sft_path, tmp_path)
assert result == {"imported": 2, "skipped": 0}
dest = tmp_path / "sft_candidates.jsonl"
lines = [json.loads(l) for l in dest.read_text().splitlines() if l.strip()]
assert len(lines) == 2
def test_import_run_deduplicates_on_id(tmp_path):
from scripts.sft_import import import_run
sft_path = tmp_path / "run1" / "sft_candidates.jsonl"
_write_candidates(sft_path, [_make_record("a"), _make_record("b")])
import_run(sft_path, tmp_path)
result = import_run(sft_path, tmp_path) # second import
assert result == {"imported": 0, "skipped": 2}
dest = tmp_path / "sft_candidates.jsonl"
lines = [l for l in dest.read_text().splitlines() if l.strip()]
assert len(lines) == 2 # no duplicates
def test_import_run_skips_records_missing_id(tmp_path):
from scripts.sft_import import import_run
sft_path = tmp_path / "run1" / "sft_candidates.jsonl"
bad = {"source": "cf-orch-benchmark", "status": "needs_review"} # no id
_write_candidates(sft_path, [bad, _make_record("a")])
result = import_run(sft_path, tmp_path)
assert result == {"imported": 1, "skipped": 0}