From 63517d135b5cf11bd110402c029ce6a44300b8dd Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 24 Apr 2026 10:22:30 -0700 Subject: [PATCH] feat(streaming): add POST /recipes/stream-token endpoint --- app/api/endpoints/recipes.py | 78 +++++++++++++++++++++++++++++++++ tests/api/test_stream_token.py | 79 +++++++++++++++++++++++++++++++++- 2 files changed, 156 insertions(+), 1 deletion(-) diff --git a/app/api/endpoints/recipes.py b/app/api/endpoints/recipes.py index 1b87606..4f33842 100644 --- a/app/api/endpoints/recipes.py +++ b/app/api/endpoints/recipes.py @@ -21,7 +21,11 @@ from app.models.schemas.recipe import ( RecipeResult, RecipeSuggestion, RoleCandidatesResponse, + StreamTokenRequest, + StreamTokenResponse, ) +from app.services.coordinator_proxy import CoordinatorError, coordinator_authorize +from app.api.endpoints.imitate import _build_recipe_prompt from app.services.recipe.assembly_recipes import ( build_from_selection, get_role_candidates, @@ -60,6 +64,44 @@ def _suggest_in_thread(db_path: Path, req: RecipeRequest) -> RecipeResult: store.close() +def _build_stream_prompt(db_path: Path, level: int) -> str: + """Fetch pantry + user settings from DB and build the recipe prompt. + + Runs in a thread (called via asyncio.to_thread) so it can use sync Store. + """ + import datetime + + store = Store(db_path) + try: + items = store.list_inventory(status="available") + pantry_names = [i["product_name"] for i in items if i.get("product_name")] + + today = datetime.date.today() + expiring_names = [ + i["product_name"] + for i in items + if i.get("product_name") + and i.get("expiry_date") + and (datetime.date.fromisoformat(i["expiry_date"]) - today).days <= 3 + ] + + settings: dict = {} + try: + rows = store.conn.execute("SELECT key, value FROM user_settings").fetchall() + settings = {r["key"]: r["value"] for r in rows} + except Exception: + pass + + constraints_raw = settings.get("dietary_constraints", "") + constraints = [c.strip() for c in constraints_raw.split(",") if c.strip()] if constraints_raw else [] + allergies_raw = settings.get("allergies", "") + allergies = [a.strip() for a in allergies_raw.split(",") if a.strip()] if allergies_raw else [] + + return _build_recipe_prompt(pantry_names, expiring_names, constraints, allergies, level) + finally: + store.close() + + async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest): """Queue an async recipe_llm job and return 202 with job_id. @@ -145,6 +187,42 @@ async def suggest_recipes( return result +@router.post("/stream-token", response_model=StreamTokenResponse) +async def get_stream_token( + req: StreamTokenRequest, + session: CloudUser = Depends(get_session), +) -> StreamTokenResponse: + """Issue a one-time stream token for LLM recipe generation. + + Tier-gated (Paid or BYOK). Builds the prompt from pantry + user settings, + then calls the cf-orch coordinator to obtain a stream URL. Returns + immediately — the frontend opens EventSource to the stream URL directly. + """ + if not can_use("recipe_suggestions", session.tier, session.has_byok): + raise HTTPException( + status_code=403, + detail="Streaming recipe generation requires Paid tier or a configured LLM backend.", + ) + if req.level == 4 and not req.wildcard_confirmed: + raise HTTPException( + status_code=400, + detail="Level 4 (Wildcard) streaming requires wildcard_confirmed=true.", + ) + + prompt = await asyncio.to_thread(_build_stream_prompt, session.db, req.level) + + try: + result = await coordinator_authorize(prompt=prompt, caller="kiwi-recipe", ttl_s=300) + except CoordinatorError as exc: + raise HTTPException(status_code=exc.status_code, detail=str(exc)) + + return StreamTokenResponse( + stream_url=result.stream_url, + token=result.token, + expires_in_s=result.expires_in_s, + ) + + @router.get("/jobs/{job_id}", response_model=RecipeJobStatus) async def get_recipe_job_status( job_id: str, diff --git a/tests/api/test_stream_token.py b/tests/api/test_stream_token.py index 894d91e..1d908c1 100644 --- a/tests/api/test_stream_token.py +++ b/tests/api/test_stream_token.py @@ -1,10 +1,30 @@ """Tests for POST /api/v1/recipes/stream-token — coordinator proxy integration.""" -import pytest +from pathlib import Path from unittest.mock import AsyncMock, patch +import pytest +from fastapi.testclient import TestClient + +from app.cloud_session import CloudUser, get_session +from app.main import app from app.models.schemas.recipe import StreamTokenRequest, StreamTokenResponse +def _make_session(tier: str = "paid", has_byok: bool = False) -> CloudUser: + return CloudUser( + user_id="test-user", + tier=tier, + db=Path("/tmp/kiwi_test.db"), + has_byok=has_byok, + license_key=None, + ) + + +def _client(tier: str = "paid", has_byok: bool = False) -> TestClient: + app.dependency_overrides[get_session] = lambda: _make_session(tier=tier, has_byok=has_byok) + return TestClient(app) + + def test_coordinator_authorize_missing_url(monkeypatch): """coordinator_authorize raises RuntimeError when COORDINATOR_URL is unset.""" monkeypatch.delenv("COORDINATOR_URL", raising=False) @@ -32,3 +52,60 @@ def test_stream_token_response(): ) assert resp.stream_url.startswith("http") assert resp.expires_in_s == 60 + + +def test_stream_token_tier_gate(): + """Free-tier session is rejected with 403.""" + client = _client(tier="free") + try: + resp = client.post("/api/v1/recipes/stream-token", json={"level": 3}) + assert resp.status_code == 403 + finally: + app.dependency_overrides.clear() + + +def test_stream_token_level4_requires_confirmation(): + """Level 4 without wildcard_confirmed=true returns 400.""" + client = _client(tier="paid") + try: + resp = client.post("/api/v1/recipes/stream-token", json={"level": 4, "wildcard_confirmed": False}) + assert resp.status_code == 400 + finally: + app.dependency_overrides.clear() + + +@patch("app.api.endpoints.recipes.coordinator_authorize", new_callable=AsyncMock) +@patch("app.api.endpoints.recipes._build_stream_prompt", return_value="mock prompt") +def test_stream_token_success_level3(mock_prompt, mock_authorize): + """Paid tier, level 3 — returns stream_url and token.""" + from app.services.coordinator_proxy import StreamTokenResult + mock_authorize.return_value = StreamTokenResult( + stream_url="http://10.1.10.71:7700/proxy/stream", + token="test-token-abc", + expires_in_s=60, + ) + client = _client(tier="paid") + try: + resp = client.post("/api/v1/recipes/stream-token", json={"level": 3}) + assert resp.status_code == 200 + data = resp.json() + assert "stream_url" in data + assert "token" in data + assert data["expires_in_s"] == 60 + mock_authorize.assert_awaited_once() + finally: + app.dependency_overrides.clear() + + +@patch("app.api.endpoints.recipes.coordinator_authorize", new_callable=AsyncMock) +@patch("app.api.endpoints.recipes._build_stream_prompt", return_value="mock prompt") +def test_stream_token_coordinator_unavailable(mock_prompt, mock_authorize): + """CoordinatorError maps to 503.""" + from app.services.coordinator_proxy import CoordinatorError + mock_authorize.side_effect = CoordinatorError("No GPU", status_code=503) + client = _client(tier="paid") + try: + resp = client.post("/api/v1/recipes/stream-token", json={"level": 3}) + assert resp.status_code == 503 + finally: + app.dependency_overrides.clear()