feat: add POST /ingest endpoint to corrections API with Bearer auth
Adds IngestRequest model and POST /api/sft/ingest route to app/data/corrections.py. Sibling CF products (Peregrine, Kiwi, etc.) can push pre-approved corrections via Bearer token auth (AVOCET_INGESTION_SECRET). Records land as status=approved in both sft_candidates.jsonl and sft_approved.jsonl immediately. 7 tests in tests/test_data_corrections.py cover 503 (secret unset), 401 (missing/malformed header), 403 (wrong secret), happy-path writes to both files, and optional label field.
This commit is contained in:
parent
0853ed7d56
commit
8fda821e15
2 changed files with 161 additions and 1 deletions
|
|
@ -11,12 +11,14 @@ from __future__ import annotations
|
|||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
import yaml
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from fastapi import APIRouter, Header, HTTPException
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
|
@ -333,3 +335,59 @@ def post_sft_config(payload: SftConfigPayload) -> dict:
|
|||
tmp.write_text(yaml.dump(raw, allow_unicode=True, sort_keys=False), encoding="utf-8")
|
||||
tmp.rename(f)
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
# -- POST /ingest --------------------------------------------------------------
|
||||
|
||||
class IngestRequest(BaseModel):
|
||||
source: str # e.g. "peregrine", "kiwi"
|
||||
task_type: str # e.g. "email_classification", "recipe_suggestion"
|
||||
prompt: str # the prompt that was sent to the LLM
|
||||
response: str # the LLM's original response
|
||||
correction: str # the human-corrected response
|
||||
label: str | None = None # optional label/category
|
||||
|
||||
|
||||
@router.post("/ingest")
|
||||
def post_ingest(
|
||||
req: IngestRequest,
|
||||
authorization: str | None = Header(default=None),
|
||||
) -> dict:
|
||||
"""Ingest a correction from a sibling CF product.
|
||||
|
||||
Authentication: Authorization: Bearer <AVOCET_INGESTION_SECRET>
|
||||
|
||||
Creates a sft_candidates record with status='approved' (pre-approved by
|
||||
the calling product -- human review already happened upstream). Also writes
|
||||
to sft_approved.jsonl so it is immediately included in export counts.
|
||||
|
||||
Returns {"ok": True, "id": "<uuid>"}.
|
||||
"""
|
||||
expected_secret = os.environ.get("AVOCET_INGESTION_SECRET", "")
|
||||
if not expected_secret:
|
||||
raise HTTPException(503, "Ingestion not configured -- AVOCET_INGESTION_SECRET not set")
|
||||
|
||||
if not authorization or not authorization.startswith("Bearer "):
|
||||
raise HTTPException(401, "Missing or malformed Authorization header")
|
||||
|
||||
token = authorization.removeprefix("Bearer ").strip()
|
||||
if token != expected_secret:
|
||||
raise HTTPException(403, "Invalid ingestion secret")
|
||||
|
||||
record_id = str(uuid.uuid4())
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
record = {
|
||||
"id": record_id,
|
||||
"source": req.source,
|
||||
"task_type": req.task_type,
|
||||
"status": "approved",
|
||||
"prompt_messages": [{"role": "user", "content": req.prompt}],
|
||||
"model_response": req.response,
|
||||
"corrected_response": req.correction,
|
||||
"label": req.label,
|
||||
"timestamp": now,
|
||||
"benchmark_run_id": None,
|
||||
}
|
||||
append_jsonl(_candidates_file(), record)
|
||||
append_jsonl(_approved_file(), record)
|
||||
return {"ok": True, "id": record_id}
|
||||
|
|
|
|||
102
tests/test_data_corrections.py
Normal file
102
tests/test_data_corrections.py
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
"""Tests for app/data/corrections.py -- POST /api/sft/ingest.
|
||||
|
||||
The corrections router is mounted at prefix="/api/sft" via the app/sft.py
|
||||
backward-compat shim, so ingest lives at /api/sft/ingest.
|
||||
"""
|
||||
import json
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_globals(tmp_path):
|
||||
from app.data import corrections as corr_module
|
||||
corr_module.set_data_dir(tmp_path)
|
||||
corr_module.set_config_dir(tmp_path)
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
from app.api import app
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
_VALID_PAYLOAD = {
|
||||
"source": "peregrine",
|
||||
"task_type": "email_classification",
|
||||
"prompt": "Classify this email: ...",
|
||||
"response": "skip",
|
||||
"correction": "action_required",
|
||||
"label": "action_required",
|
||||
}
|
||||
|
||||
_SECRET = "test-secret-abc123"
|
||||
|
||||
|
||||
def test_ingest_503_when_secret_not_configured(client, monkeypatch):
|
||||
monkeypatch.delenv("AVOCET_INGESTION_SECRET", raising=False)
|
||||
r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD,
|
||||
headers={"Authorization": f"Bearer {_SECRET}"})
|
||||
assert r.status_code == 503
|
||||
|
||||
|
||||
def test_ingest_401_when_no_auth_header(client, monkeypatch):
|
||||
monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET)
|
||||
r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD)
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
def test_ingest_401_when_malformed_header(client, monkeypatch):
|
||||
monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET)
|
||||
r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD,
|
||||
headers={"Authorization": "Token bad-format"})
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
def test_ingest_403_when_wrong_secret(client, monkeypatch):
|
||||
monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET)
|
||||
r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD,
|
||||
headers={"Authorization": "Bearer wrong-secret"})
|
||||
assert r.status_code == 403
|
||||
|
||||
|
||||
def test_ingest_creates_approved_record(client, monkeypatch, tmp_path):
|
||||
from app.data import corrections as corr_module
|
||||
monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET)
|
||||
corr_module.set_data_dir(tmp_path)
|
||||
r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD,
|
||||
headers={"Authorization": f"Bearer {_SECRET}"})
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
assert data["ok"] is True
|
||||
assert "id" in data
|
||||
candidates = corr_module.read_jsonl(corr_module._candidates_file())
|
||||
assert len(candidates) == 1
|
||||
rec = candidates[0]
|
||||
assert rec["status"] == "approved"
|
||||
assert rec["source"] == "peregrine"
|
||||
assert rec["corrected_response"] == "action_required"
|
||||
assert rec["id"] == data["id"]
|
||||
|
||||
|
||||
def test_ingest_also_writes_to_approved_file(client, monkeypatch, tmp_path):
|
||||
from app.data import corrections as corr_module
|
||||
monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET)
|
||||
corr_module.set_data_dir(tmp_path)
|
||||
r = client.post("/api/sft/ingest", json=_VALID_PAYLOAD,
|
||||
headers={"Authorization": f"Bearer {_SECRET}"})
|
||||
assert r.status_code == 200
|
||||
approved = corr_module.read_jsonl(corr_module._approved_file())
|
||||
assert len(approved) == 1
|
||||
assert approved[0]["id"] == r.json()["id"]
|
||||
|
||||
|
||||
def test_ingest_without_label_is_accepted(client, monkeypatch, tmp_path):
|
||||
from app.data import corrections as corr_module
|
||||
monkeypatch.setenv("AVOCET_INGESTION_SECRET", _SECRET)
|
||||
corr_module.set_data_dir(tmp_path)
|
||||
payload = {**_VALID_PAYLOAD, "label": None}
|
||||
r = client.post("/api/sft/ingest", json=payload,
|
||||
headers={"Authorization": f"Bearer {_SECRET}"})
|
||||
assert r.status_code == 200
|
||||
Loading…
Reference in a new issue