feat(streaming): add native SSE fallback for L3/L4 recipe generation (closes #126)

Two-phase streaming architecture:
  Phase 1 (sync thread): IngredientClassifier builds element profiles +
    gap list from SQLite — thread-safe, no async context needed
  Phase 2 (async): LLMRecipeGenerator.stream_generate() yields tokens via
    cf-orch warm vllm (existing /stream-token path) or AsyncOpenAI against
    Ollama if the coordinator is unavailable

Backend (app/services/recipe/llm_recipe.py):
  - stream_generate() async generator; _try_alloc_for_stream() sync helper
  - _stream_openai_compat() static method handles __auto__ model resolution
  - LLMRecipeGenerator(None) is safe for streaming (store not used)

Endpoint (app/api/endpoints/recipes.py):
  - ?stream=true on POST /recipes/suggest returns StreamingResponse
  - X-Accel-Buffering: no prevents nginx buffering without nginx.conf edits

Frontend (api.ts, recipes.ts, RecipesView.vue):
  - suggestRecipeStream() uses fetch + ReadableStream (POST; EventSource
    only supports GET)
  - streamSuggest() action in recipes store builds request internally
  - RecipesView.streamRecipe() silently falls back to native SSE when
    cf-orch token fetch fails rather than surfacing an error
This commit is contained in:
pyr0ball 2026-05-11 11:32:54 -07:00
parent 04dbdddbad
commit e57f46f4b6
6 changed files with 257 additions and 37 deletions

View file

@ -6,7 +6,9 @@ import logging
from pathlib import Path from pathlib import Path
from typing import Annotated from typing import Annotated
import json as _json_mod
from fastapi import APIRouter, Depends, HTTPException, Query from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from app.cloud_session import CloudUser, _auth_label, get_session from app.cloud_session import CloudUser, _auth_label, get_session
@ -103,6 +105,39 @@ def _build_stream_prompt(db_path: Path, level: int) -> str:
store.close() store.close()
async def _stream_recipe_sse(db_path: Path, req: RecipeRequest):
"""Async generator that yields SSE events for a streaming recipe request.
Phase 1 (thread): classify pantry items using a temporary Store.
Phase 2 (async): stream tokens from LLM via LLMRecipeGenerator.stream_generate().
"""
def _prep(db_path: Path) -> tuple[list, list[str]]:
from app.services.recipe.element_classifier import IngredientClassifier
store = Store(db_path)
try:
classifier = IngredientClassifier(store)
profiles = classifier.classify_batch(req.pantry_items)
gaps = classifier.identify_gaps(profiles)
return profiles, gaps
finally:
store.close()
try:
profiles, gaps = await asyncio.to_thread(_prep, db_path)
except Exception as exc:
yield f"data: {_json_mod.dumps({'error': str(exc)})}\n\n"
return
from app.services.recipe.llm_recipe import LLMRecipeGenerator
gen = LLMRecipeGenerator(None)
try:
async for token in gen.stream_generate(req, profiles, gaps):
yield f"data: {_json_mod.dumps({'chunk': token})}\n\n"
yield f"data: {_json_mod.dumps({'done': True})}\n\n"
except Exception as exc:
yield f"data: {_json_mod.dumps({'error': str(exc)})}\n\n"
async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest): async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest):
"""Queue an async recipe_llm job and return 202 with job_id. """Queue an async recipe_llm job and return 202 with job_id.
@ -144,6 +179,7 @@ async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest):
async def suggest_recipes( async def suggest_recipes(
req: RecipeRequest, req: RecipeRequest,
async_mode: bool = Query(default=False, alias="async"), async_mode: bool = Query(default=False, alias="async"),
stream: bool = Query(default=False),
session: CloudUser = Depends(get_session), session: CloudUser = Depends(get_session),
store: Store = Depends(get_store), store: Store = Depends(get_store),
): ):
@ -179,6 +215,13 @@ async def suggest_recipes(
req = req.model_copy(update={"level": 2}) req = req.model_copy(update={"level": 2})
orch_fallback = True orch_fallback = True
if stream and req.level in (3, 4):
return StreamingResponse(
_stream_recipe_sse(session.db, req),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
if req.level in (3, 4) and async_mode: if req.level in (3, 4) and async_mode:
return await _enqueue_recipe_job(session, req) return await _enqueue_recipe_job(session, req)

View file

@ -1,13 +1,14 @@
"""LLM-driven recipe generator for Levels 3 and 4.""" """LLM-driven recipe generator for Levels 3 and 4."""
from __future__ import annotations from __future__ import annotations
import asyncio
import logging import logging
import os import os
import re import re
from contextlib import nullcontext from contextlib import nullcontext
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, AsyncGenerator
from openai import OpenAI from openai import AsyncOpenAI, OpenAI
if TYPE_CHECKING: if TYPE_CHECKING:
from app.db.store import Store from app.db.store import Store
@ -149,8 +150,8 @@ class LLMRecipeGenerator:
return "\n".join(lines) return "\n".join(lines)
_SERVICE_TYPE = "vllm" _SERVICE_TYPE = "cf-text"
_MODEL_CANDIDATES = ["Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"] _MODEL_CANDIDATES = ["granite-4.1-8b", "deepseek-r1-1.5b"]
_TTL_S = 300.0 _TTL_S = 300.0
_CALLER = "kiwi-recipe" _CALLER = "kiwi-recipe"
@ -182,7 +183,12 @@ class LLMRecipeGenerator:
With CF_ORCH_URL set: acquires a vLLM allocation via CFOrchClient and With CF_ORCH_URL set: acquires a vLLM allocation via CFOrchClient and
calls the OpenAI-compatible API directly against the allocated service URL. calls the OpenAI-compatible API directly against the allocated service URL.
Allocation failure falls through to LLMRouter rather than silently returning "". Falls back to LLMRouter when:
- Allocation succeeded but the service is cold (warm=False) avoids
making the user wait for model load; LLMRouter uses Ollama which is
already running.
- Allocation succeeded but the connection to the service URL fails the
agent may have registered the service but failed to start it.
Without CF_ORCH_URL: uses LLMRouter directly. Without CF_ORCH_URL: uses LLMRouter directly.
""" """
ctx = self._get_llm_context() ctx = self._get_llm_context()
@ -208,6 +214,15 @@ class LLMRecipeGenerator:
try: try:
if alloc is not None: if alloc is not None:
# Skip cold services — model not yet loaded means the user would
# wait 60120 s for model load before any response. Use LLMRouter
# (Ollama) instead, which is already warm on the host.
if not alloc.warm:
logger.info(
"cf-orch vllm allocated but cold (warm=False) — releasing and falling back to LLMRouter"
)
raise RuntimeError("vllm cold")
base_url = alloc.url.rstrip("/") + "/v1" base_url = alloc.url.rstrip("/") + "/v1"
client = OpenAI(base_url=base_url, api_key="any") client = OpenAI(base_url=base_url, api_key="any")
model = alloc.model or "__auto__" model = alloc.model or "__auto__"
@ -223,6 +238,20 @@ class LLMRecipeGenerator:
return LLMRouter().complete(prompt) return LLMRouter().complete(prompt)
except Exception as exc: except Exception as exc:
logger.error("LLM call failed: %s", exc) logger.error("LLM call failed: %s", exc)
# When cf-orch gave us an allocation but the service is unreachable
# (cold skip, connection refused, or other error), fall back to
# LLMRouter rather than silently returning empty.
# Skip "vllm" in the fallback order — that backend also routes through
# cf-orch, which would trigger a second (wasted) cold allocation.
if alloc is not None:
logger.info("Falling back to LLMRouter after vllm failure")
try:
from circuitforge_core.llm.router import LLMRouter
router = LLMRouter()
_order = [b for b in (router.config.get("fallback_order") or []) if b != "vllm"]
return router.complete(prompt, fallback_order=_order or None)
except Exception as fallback_exc:
logger.error("LLMRouter fallback also failed: %s", fallback_exc)
return "" return ""
finally: finally:
if ctx is not None: if ctx is not None:
@ -359,3 +388,91 @@ class LLMRecipeGenerator:
suggestions=[suggestion], suggestions=[suggestion],
element_gaps=gaps, element_gaps=gaps,
) )
async def stream_generate(
self,
req: RecipeRequest,
profiles: list,
gaps: list[str],
) -> AsyncGenerator[str, None]:
"""Stream LLM tokens for L3/L4. Yields raw text chunks as they arrive.
Tries cf-orch warm vllm first; falls back to Ollama via AsyncOpenAI.
When neither is reachable, falls back to blocking _call_llm and yields
the complete response as a single chunk so the caller always gets output.
"""
if req.level == 4:
prompt = self.build_level4_prompt(req)
else:
prompt = self.build_level3_prompt(req, profiles, gaps)
# Phase 1: try cf-orch warm vllm (sync allocation, wrapped in thread)
alloc_info = await asyncio.to_thread(self._try_alloc_for_stream)
if alloc_info is not None:
alloc, ctx = alloc_info
try:
async for token in self._stream_openai_compat(
alloc.url.rstrip("/") + "/v1", "any", alloc.model or "__auto__", prompt
):
yield token
return
except Exception as exc:
logger.debug("cf-orch stream failed, falling back to Ollama: %s", exc)
finally:
await asyncio.to_thread(lambda: _safe_exit(ctx))
# Phase 2: Ollama streaming via OpenAI-compat API
from circuitforge_core.llm.router import LLMRouter
router = LLMRouter()
ollama = router.config.get("backends", {}).get("ollama")
if ollama and ollama.get("enabled", True):
base_url = ollama["base_url"]
model = ollama.get("model", "llama3")
try:
async for token in self._stream_openai_compat(base_url, "any", model, prompt):
yield token
return
except Exception as exc:
logger.warning("Ollama streaming failed, falling back to blocking: %s", exc)
# Phase 3: blocking fallback — yields full response at once
result = await asyncio.to_thread(self._call_llm, prompt)
if result:
yield result
def _try_alloc_for_stream(self):
"""Attempt cf-orch allocation synchronously; return (alloc, ctx) or None."""
ctx = self._get_llm_context()
try:
alloc = ctx.__enter__()
if alloc is not None and alloc.warm:
return alloc, ctx
# Not warm — release and signal fallback
_safe_exit(ctx)
except Exception as exc:
logger.debug("cf-orch alloc for stream failed: %s", exc)
return None
@staticmethod
async def _stream_openai_compat(
base_url: str, api_key: str, model: str, prompt: str
) -> AsyncGenerator[str, None]:
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
if model == "__auto__":
models = await client.models.list()
model = models.data[0].id
stream = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def _safe_exit(ctx) -> None:
try:
ctx.__exit__(None, None, None)
except Exception:
pass

View file

@ -18,6 +18,10 @@ server {
proxy_set_header X-CF-Session $http_x_cf_session; proxy_set_header X-CF-Session $http_x_cf_session;
# Allow image uploads (barcode/receipt photos from phone cameras). # Allow image uploads (barcode/receipt photos from phone cameras).
client_max_body_size 20m; client_max_body_size 20m;
# LLM inference (recipe suggestions, expiry fallback) can take 60-120s.
# Default proxy_read_timeout is 60s which causes 504s on full recipe generation.
proxy_read_timeout 180s;
proxy_send_timeout 180s;
} }
# Direct-port LAN access (localhost:8515): when VITE_API_BASE='/kiwi', the frontend # Direct-port LAN access (localhost:8515): when VITE_API_BASE='/kiwi', the frontend
@ -34,6 +38,8 @@ server {
proxy_set_header X-Forwarded-Proto $http_x_forwarded_proto; proxy_set_header X-Forwarded-Proto $http_x_forwarded_proto;
proxy_set_header X-CF-Session $http_x_cf_session; proxy_set_header X-CF-Session $http_x_cf_session;
client_max_body_size 20m; client_max_body_size 20m;
proxy_read_timeout 180s;
proxy_send_timeout 180s;
} }
# When accessed directly (localhost:8515) instead of via Caddy (/kiwi path-strip), # When accessed directly (localhost:8515) instead of via Caddy (/kiwi path-strip),

View file

@ -246,9 +246,9 @@
<span id="allergy-hint" class="form-hint">No recipes containing these ingredients will appear.</span> <span id="allergy-hint" class="form-hint">No recipes containing these ingredients will appear.</span>
</div> </div>
<!-- Not Today temporary per-session ingredient exclusions --> <!-- Not Today ingredient exclusions, persisted to localStorage -->
<div class="form-group"> <div class="form-group">
<label class="form-label">Not today <span class="text-muted text-xs">(skip these ingredients this session)</span></label> <label class="form-label">Not today <span class="text-muted text-xs">(saved between visits)</span></label>
<div v-if="recipesStore.excludeIngredients.length > 0" class="tags-wrap flex flex-wrap gap-xs mb-xs"> <div v-if="recipesStore.excludeIngredients.length > 0" class="tags-wrap flex flex-wrap gap-xs mb-xs">
<span <span
v-for="tag in recipesStore.excludeIngredients" v-for="tag in recipesStore.excludeIngredients"
@ -1122,41 +1122,35 @@ async function streamRecipe(level: 3 | 4, wildcardConfirmed = false) {
streamChunks.value = '' streamChunks.value = ''
streamError.value = null streamError.value = null
let tokenData: StreamTokenResponse // Try cf-orch warm vllm path first (returns a direct stream URL)
let tokenData: StreamTokenResponse | null = null
try { try {
tokenData = await recipesAPI.getRecipeStreamToken({ level, wildcard_confirmed: wildcardConfirmed }) tokenData = await recipesAPI.getRecipeStreamToken({ level, wildcard_confirmed: wildcardConfirmed })
} catch (err: unknown) { } catch { /* cf-orch unavailable — fall through to native SSE */ }
isStreaming.value = false
streamError.value = err instanceof Error ? err.message : 'Failed to start stream' if (tokenData) {
const url = `${tokenData.stream_url}?token=${encodeURIComponent(tokenData.token)}`
const es = new EventSource(url)
es.onmessage = (e: MessageEvent) => {
try {
const data = JSON.parse(e.data)
if (data.done) { es.close(); isStreaming.value = false }
else if (data.error) { es.close(); isStreaming.value = false; streamError.value = data.error }
else if (data.chunk) { streamChunks.value += data.chunk }
} catch { /* ignore malformed events */ }
}
es.onerror = () => { es.close(); isStreaming.value = false; streamError.value = 'Stream connection lost' }
return return
} }
const url = `${tokenData.stream_url}?token=${encodeURIComponent(tokenData.token)}` // Native SSE fallback: Kiwi backend streams directly from Ollama
const es = new EventSource(url) await recipesStore.streamSuggest(
pantryItems.value,
es.onmessage = (e: MessageEvent) => { secondaryPantryItems.value,
try { (chunk) => { streamChunks.value += chunk },
const data = JSON.parse(e.data) () => { isStreaming.value = false },
if (data.done) { (err) => { isStreaming.value = false; streamError.value = err },
es.close() )
isStreaming.value = false
} else if (data.error) {
es.close()
isStreaming.value = false
streamError.value = data.error
} else if (data.chunk) {
streamChunks.value += data.chunk
}
} catch {
// ignore malformed events
}
}
es.onerror = () => {
es.close()
isStreaming.value = false
streamError.value = 'Stream connection lost'
}
} }
// Suggest handler // Suggest handler

View file

@ -737,6 +737,54 @@ export const recipesAPI = {
}) })
return response.data return response.data
}, },
/** Stream a recipe via native SSE (Ollama fallback). Calls callbacks as tokens arrive. */
async suggestRecipeStream(
req: RecipeRequest,
onChunk: (chunk: string) => void,
onDone: () => void,
onError: (err: string) => void,
): Promise<void> {
const baseUrl = (api.defaults.baseURL ?? '') as string
let response: Response
try {
response = await fetch(`${baseUrl}/recipes/suggest?stream=true`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(req),
})
} catch (err: unknown) {
onError(err instanceof Error ? err.message : 'Network error')
return
}
if (!response.ok) {
onError(`HTTP ${response.status}`)
return
}
const reader = response.body?.getReader()
if (!reader) { onError('No response body'); return }
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) { onDone(); break }
buffer += decoder.decode(value, { stream: true })
const parts = buffer.split('\n\n')
buffer = parts.pop() ?? ''
for (const part of parts) {
if (!part.startsWith('data: ')) continue
try {
const data = JSON.parse(part.slice(6))
if (data.done) { onDone(); return }
else if (data.error) { onError(data.error); return }
else if (data.chunk) { onChunk(data.chunk) }
} catch { /* ignore malformed events */ }
}
}
},
} }
// ========== Settings API ========== // ========== Settings API ==========

View file

@ -379,6 +379,17 @@ export const useRecipesStore = defineStore('recipes', () => {
wildcardConfirmed.value = false wildcardConfirmed.value = false
} }
async function streamSuggest(
pantryItems: string[],
secondaryPantryItems: Record<string, string>,
onChunk: (chunk: string) => void,
onDone: () => void,
onError: (err: string) => void,
): Promise<void> {
const req = _buildRequest(pantryItems, secondaryPantryItems)
await recipesAPI.suggestRecipeStream(req, onChunk, onDone, onError)
}
return { return {
result, result,
loading, loading,
@ -416,6 +427,7 @@ export const useRecipesStore = defineStore('recipes', () => {
missingIngredientMode, missingIngredientMode,
builderFilterMode, builderFilterMode,
suggest, suggest,
streamSuggest,
loadMore, loadMore,
dismiss, dismiss,
undismiss, undismiss,