feat(streaming): add POST /recipes/stream-token endpoint
This commit is contained in:
parent
2547f80893
commit
63517d135b
2 changed files with 156 additions and 1 deletions
|
|
@ -21,7 +21,11 @@ from app.models.schemas.recipe import (
|
||||||
RecipeResult,
|
RecipeResult,
|
||||||
RecipeSuggestion,
|
RecipeSuggestion,
|
||||||
RoleCandidatesResponse,
|
RoleCandidatesResponse,
|
||||||
|
StreamTokenRequest,
|
||||||
|
StreamTokenResponse,
|
||||||
)
|
)
|
||||||
|
from app.services.coordinator_proxy import CoordinatorError, coordinator_authorize
|
||||||
|
from app.api.endpoints.imitate import _build_recipe_prompt
|
||||||
from app.services.recipe.assembly_recipes import (
|
from app.services.recipe.assembly_recipes import (
|
||||||
build_from_selection,
|
build_from_selection,
|
||||||
get_role_candidates,
|
get_role_candidates,
|
||||||
|
|
@ -60,6 +64,44 @@ def _suggest_in_thread(db_path: Path, req: RecipeRequest) -> RecipeResult:
|
||||||
store.close()
|
store.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _build_stream_prompt(db_path: Path, level: int) -> str:
|
||||||
|
"""Fetch pantry + user settings from DB and build the recipe prompt.
|
||||||
|
|
||||||
|
Runs in a thread (called via asyncio.to_thread) so it can use sync Store.
|
||||||
|
"""
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
store = Store(db_path)
|
||||||
|
try:
|
||||||
|
items = store.list_inventory(status="available")
|
||||||
|
pantry_names = [i["product_name"] for i in items if i.get("product_name")]
|
||||||
|
|
||||||
|
today = datetime.date.today()
|
||||||
|
expiring_names = [
|
||||||
|
i["product_name"]
|
||||||
|
for i in items
|
||||||
|
if i.get("product_name")
|
||||||
|
and i.get("expiry_date")
|
||||||
|
and (datetime.date.fromisoformat(i["expiry_date"]) - today).days <= 3
|
||||||
|
]
|
||||||
|
|
||||||
|
settings: dict = {}
|
||||||
|
try:
|
||||||
|
rows = store.conn.execute("SELECT key, value FROM user_settings").fetchall()
|
||||||
|
settings = {r["key"]: r["value"] for r in rows}
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
constraints_raw = settings.get("dietary_constraints", "")
|
||||||
|
constraints = [c.strip() for c in constraints_raw.split(",") if c.strip()] if constraints_raw else []
|
||||||
|
allergies_raw = settings.get("allergies", "")
|
||||||
|
allergies = [a.strip() for a in allergies_raw.split(",") if a.strip()] if allergies_raw else []
|
||||||
|
|
||||||
|
return _build_recipe_prompt(pantry_names, expiring_names, constraints, allergies, level)
|
||||||
|
finally:
|
||||||
|
store.close()
|
||||||
|
|
||||||
|
|
||||||
async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest):
|
async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest):
|
||||||
"""Queue an async recipe_llm job and return 202 with job_id.
|
"""Queue an async recipe_llm job and return 202 with job_id.
|
||||||
|
|
||||||
|
|
@ -145,6 +187,42 @@ async def suggest_recipes(
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/stream-token", response_model=StreamTokenResponse)
|
||||||
|
async def get_stream_token(
|
||||||
|
req: StreamTokenRequest,
|
||||||
|
session: CloudUser = Depends(get_session),
|
||||||
|
) -> StreamTokenResponse:
|
||||||
|
"""Issue a one-time stream token for LLM recipe generation.
|
||||||
|
|
||||||
|
Tier-gated (Paid or BYOK). Builds the prompt from pantry + user settings,
|
||||||
|
then calls the cf-orch coordinator to obtain a stream URL. Returns
|
||||||
|
immediately — the frontend opens EventSource to the stream URL directly.
|
||||||
|
"""
|
||||||
|
if not can_use("recipe_suggestions", session.tier, session.has_byok):
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=403,
|
||||||
|
detail="Streaming recipe generation requires Paid tier or a configured LLM backend.",
|
||||||
|
)
|
||||||
|
if req.level == 4 and not req.wildcard_confirmed:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail="Level 4 (Wildcard) streaming requires wildcard_confirmed=true.",
|
||||||
|
)
|
||||||
|
|
||||||
|
prompt = await asyncio.to_thread(_build_stream_prompt, session.db, req.level)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = await coordinator_authorize(prompt=prompt, caller="kiwi-recipe", ttl_s=300)
|
||||||
|
except CoordinatorError as exc:
|
||||||
|
raise HTTPException(status_code=exc.status_code, detail=str(exc))
|
||||||
|
|
||||||
|
return StreamTokenResponse(
|
||||||
|
stream_url=result.stream_url,
|
||||||
|
token=result.token,
|
||||||
|
expires_in_s=result.expires_in_s,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/jobs/{job_id}", response_model=RecipeJobStatus)
|
@router.get("/jobs/{job_id}", response_model=RecipeJobStatus)
|
||||||
async def get_recipe_job_status(
|
async def get_recipe_job_status(
|
||||||
job_id: str,
|
job_id: str,
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,30 @@
|
||||||
"""Tests for POST /api/v1/recipes/stream-token — coordinator proxy integration."""
|
"""Tests for POST /api/v1/recipes/stream-token — coordinator proxy integration."""
|
||||||
import pytest
|
from pathlib import Path
|
||||||
from unittest.mock import AsyncMock, patch
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
from app.cloud_session import CloudUser, get_session
|
||||||
|
from app.main import app
|
||||||
from app.models.schemas.recipe import StreamTokenRequest, StreamTokenResponse
|
from app.models.schemas.recipe import StreamTokenRequest, StreamTokenResponse
|
||||||
|
|
||||||
|
|
||||||
|
def _make_session(tier: str = "paid", has_byok: bool = False) -> CloudUser:
|
||||||
|
return CloudUser(
|
||||||
|
user_id="test-user",
|
||||||
|
tier=tier,
|
||||||
|
db=Path("/tmp/kiwi_test.db"),
|
||||||
|
has_byok=has_byok,
|
||||||
|
license_key=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _client(tier: str = "paid", has_byok: bool = False) -> TestClient:
|
||||||
|
app.dependency_overrides[get_session] = lambda: _make_session(tier=tier, has_byok=has_byok)
|
||||||
|
return TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
def test_coordinator_authorize_missing_url(monkeypatch):
|
def test_coordinator_authorize_missing_url(monkeypatch):
|
||||||
"""coordinator_authorize raises RuntimeError when COORDINATOR_URL is unset."""
|
"""coordinator_authorize raises RuntimeError when COORDINATOR_URL is unset."""
|
||||||
monkeypatch.delenv("COORDINATOR_URL", raising=False)
|
monkeypatch.delenv("COORDINATOR_URL", raising=False)
|
||||||
|
|
@ -32,3 +52,60 @@ def test_stream_token_response():
|
||||||
)
|
)
|
||||||
assert resp.stream_url.startswith("http")
|
assert resp.stream_url.startswith("http")
|
||||||
assert resp.expires_in_s == 60
|
assert resp.expires_in_s == 60
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_token_tier_gate():
|
||||||
|
"""Free-tier session is rejected with 403."""
|
||||||
|
client = _client(tier="free")
|
||||||
|
try:
|
||||||
|
resp = client.post("/api/v1/recipes/stream-token", json={"level": 3})
|
||||||
|
assert resp.status_code == 403
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_token_level4_requires_confirmation():
|
||||||
|
"""Level 4 without wildcard_confirmed=true returns 400."""
|
||||||
|
client = _client(tier="paid")
|
||||||
|
try:
|
||||||
|
resp = client.post("/api/v1/recipes/stream-token", json={"level": 4, "wildcard_confirmed": False})
|
||||||
|
assert resp.status_code == 400
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.api.endpoints.recipes.coordinator_authorize", new_callable=AsyncMock)
|
||||||
|
@patch("app.api.endpoints.recipes._build_stream_prompt", return_value="mock prompt")
|
||||||
|
def test_stream_token_success_level3(mock_prompt, mock_authorize):
|
||||||
|
"""Paid tier, level 3 — returns stream_url and token."""
|
||||||
|
from app.services.coordinator_proxy import StreamTokenResult
|
||||||
|
mock_authorize.return_value = StreamTokenResult(
|
||||||
|
stream_url="http://10.1.10.71:7700/proxy/stream",
|
||||||
|
token="test-token-abc",
|
||||||
|
expires_in_s=60,
|
||||||
|
)
|
||||||
|
client = _client(tier="paid")
|
||||||
|
try:
|
||||||
|
resp = client.post("/api/v1/recipes/stream-token", json={"level": 3})
|
||||||
|
assert resp.status_code == 200
|
||||||
|
data = resp.json()
|
||||||
|
assert "stream_url" in data
|
||||||
|
assert "token" in data
|
||||||
|
assert data["expires_in_s"] == 60
|
||||||
|
mock_authorize.assert_awaited_once()
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.api.endpoints.recipes.coordinator_authorize", new_callable=AsyncMock)
|
||||||
|
@patch("app.api.endpoints.recipes._build_stream_prompt", return_value="mock prompt")
|
||||||
|
def test_stream_token_coordinator_unavailable(mock_prompt, mock_authorize):
|
||||||
|
"""CoordinatorError maps to 503."""
|
||||||
|
from app.services.coordinator_proxy import CoordinatorError
|
||||||
|
mock_authorize.side_effect = CoordinatorError("No GPU", status_code=503)
|
||||||
|
client = _client(tier="paid")
|
||||||
|
try:
|
||||||
|
resp = client.post("/api/v1/recipes/stream-token", json={"level": 3})
|
||||||
|
assert resp.status_code == 503
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue