From 8fda821e15abb4efb1e6bdeef4f3bf82238ac0d8 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Sat, 2 May 2026 09:07:10 -0700 Subject: [PATCH] feat: add POST /ingest endpoint to corrections API with Bearer auth Adds IngestRequest model and POST /api/sft/ingest route to app/data/corrections.py. Sibling CF products (Peregrine, Kiwi, etc.) can push pre-approved corrections via Bearer token auth (AVOCET_INGESTION_SECRET). Records land as status=approved in both sft_candidates.jsonl and sft_approved.jsonl immediately. 7 tests in tests/test_data_corrections.py cover 503 (secret unset), 401 (missing/malformed header), 403 (wrong secret), happy-path writes to both files, and optional label field. --- app/data/corrections.py | 60 ++++++++++++++++++- tests/test_data_corrections.py | 102 +++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 tests/test_data_corrections.py diff --git a/app/data/corrections.py b/app/data/corrections.py index ca54342..12f821c 100644 --- a/app/data/corrections.py +++ b/app/data/corrections.py @@ -11,12 +11,14 @@ from __future__ import annotations import json import logging +import os +import uuid from datetime import datetime, timezone from pathlib import Path from typing import Literal import yaml -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Header, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel @@ -333,3 +335,59 @@ def post_sft_config(payload: SftConfigPayload) -> dict: tmp.write_text(yaml.dump(raw, allow_unicode=True, sort_keys=False), encoding="utf-8") tmp.rename(f) return {"ok": True} + + +# -- POST /ingest -------------------------------------------------------------- + +class IngestRequest(BaseModel): + source: str # e.g. "peregrine", "kiwi" + task_type: str # e.g. "email_classification", "recipe_suggestion" + prompt: str # the prompt that was sent to the LLM + response: str # the LLM's original response + correction: str # the human-corrected response + label: str | None = None # optional label/category + + +@router.post("/ingest") +def post_ingest( + req: IngestRequest, + authorization: str | None = Header(default=None), +) -> dict: + """Ingest a correction from a sibling CF product. + + Authentication: Authorization: Bearer + + Creates a sft_candidates record with status='approved' (pre-approved by + the calling product -- human review already happened upstream). Also writes + to sft_approved.jsonl so it is immediately included in export counts. + + Returns {"ok": True, "id": ""}. + """ + expected_secret = os.environ.get("AVOCET_INGESTION_SECRET", "") + if not expected_secret: + raise HTTPException(503, "Ingestion not configured -- AVOCET_INGESTION_SECRET not set") + + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(401, "Missing or malformed Authorization header") + + token = authorization.removeprefix("Bearer ").strip() + if token != expected_secret: + raise HTTPException(403, "Invalid ingestion secret") + + record_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + record = { + "id": record_id, + "source": req.source, + "task_type": req.task_type, + "status": "approved", + "prompt_messages": [{"role": "user", "content": req.prompt}], + "model_response": req.response, + "corrected_response": req.correction, + "label": req.label, + "timestamp": now, + "benchmark_run_id": None, + } + append_jsonl(_candidates_file(), record) + append_jsonl(_approved_file(), record) + return {"ok": True, "id": record_id} diff --git a/tests/test_data_corrections.py b/tests/test_data_corrections.py new file mode 100644 index 0000000..679403b --- /dev/null +++ b/tests/test_data_corrections.py @@ -0,0 +1,102 @@ +"""Tests for app/data/corrections.py -- POST /api/sft/ingest. + +The corrections router is mounted at prefix="/api/sft" via the app/sft.py +backward-compat shim, so ingest lives at /api/sft/ingest. +""" +import json +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture(autouse=True) +def reset_globals(tmp_path): + from app.data import corrections as corr_module + corr_module.set_data_dir(tmp_path) + corr_module.set_config_dir(tmp_path) + yield + + +@pytest.fixture +def client(): + from app.api import app + return TestClient(app) + + +_VALID_PAYLOAD = { + "source": "peregrine", + "task_type": "email_classification", + "prompt": "Classify this email: ...", + "response": "skip", + "correction": "action_required", + "label": "action_required", +} + +_SECRET = "test-secret-abc123" + + +def test_ingest_503_when_secret_not_configured(client, monkeypatch): + monkeypatch.delenv("AVOCET_INGESTION_SECRET", raising=False) + r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD, + headers={"Authorization": f"Bearer {_SECRET}"}) + assert r.status_code == 503 + + +def test_ingest_401_when_no_auth_header(client, monkeypatch): + monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET) + r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD) + assert r.status_code == 401 + + +def test_ingest_401_when_malformed_header(client, monkeypatch): + monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET) + r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD, + headers={"Authorization": "Token bad-format"}) + assert r.status_code == 401 + + +def test_ingest_403_when_wrong_secret(client, monkeypatch): + monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET) + r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD, + headers={"Authorization": "Bearer wrong-secret"}) + assert r.status_code == 403 + + +def test_ingest_creates_approved_record(client, monkeypatch, tmp_path): + from app.data import corrections as corr_module + monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET) + corr_module.set_data_dir(tmp_path) + r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD, + headers={"Authorization": f"Bearer {_SECRET}"}) + assert r.status_code == 200 + data = r.json() + assert data["ok"] is True + assert "id" in data + candidates = corr_module.read_jsonl(corr_module._candidates_file()) + assert len(candidates) == 1 + rec = candidates[0] + assert rec["status"] == "approved" + assert rec["source"] == "peregrine" + assert rec["corrected_response"] == "action_required" + assert rec["id"] == data["id"] + + +def test_ingest_also_writes_to_approved_file(client, monkeypatch, tmp_path): + from app.data import corrections as corr_module + monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET) + corr_module.set_data_dir(tmp_path) + r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD, + headers={"Authorization": f"Bearer {_SECRET}"}) + assert r.status_code == 200 + approved = corr_module.read_jsonl(corr_module._approved_file()) + assert len(approved) == 1 + assert approved[0]["id"] == r.json()["id"] + + +def test_ingest_without_label_is_accepted(client, monkeypatch, tmp_path): + from app.data import corrections as corr_module + monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET) + corr_module.set_data_dir(tmp_path) + payload = {**_VALID_PAYLOAD, "label": None} + r = client.post("/api/sft/ingest", json=payload, + headers={"Authorization": f"Bearer {_SECRET}"}) + assert r.status_code == 200