diff --git a/app/api/endpoints/recipes.py b/app/api/endpoints/recipes.py index a76106d..e28f792 100644 --- a/app/api/endpoints/recipes.py +++ b/app/api/endpoints/recipes.py @@ -6,7 +6,9 @@ import logging from pathlib import Path from typing import Annotated +import json as _json_mod from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.responses import StreamingResponse from app.cloud_session import CloudUser, _auth_label, get_session @@ -103,6 +105,39 @@ def _build_stream_prompt(db_path: Path, level: int) -> str: store.close() +async def _stream_recipe_sse(db_path: Path, req: RecipeRequest): + """Async generator that yields SSE events for a streaming recipe request. + + Phase 1 (thread): classify pantry items using a temporary Store. + Phase 2 (async): stream tokens from LLM via LLMRecipeGenerator.stream_generate(). + """ + def _prep(db_path: Path) -> tuple[list, list[str]]: + from app.services.recipe.element_classifier import IngredientClassifier + store = Store(db_path) + try: + classifier = IngredientClassifier(store) + profiles = classifier.classify_batch(req.pantry_items) + gaps = classifier.identify_gaps(profiles) + return profiles, gaps + finally: + store.close() + + try: + profiles, gaps = await asyncio.to_thread(_prep, db_path) + except Exception as exc: + yield f"data: {_json_mod.dumps({'error': str(exc)})}\n\n" + return + + from app.services.recipe.llm_recipe import LLMRecipeGenerator + gen = LLMRecipeGenerator(None) + try: + async for token in gen.stream_generate(req, profiles, gaps): + yield f"data: {_json_mod.dumps({'chunk': token})}\n\n" + yield f"data: {_json_mod.dumps({'done': True})}\n\n" + except Exception as exc: + yield f"data: {_json_mod.dumps({'error': str(exc)})}\n\n" + + async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest): """Queue an async recipe_llm job and return 202 with job_id. @@ -144,6 +179,7 @@ async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest): async def suggest_recipes( req: RecipeRequest, async_mode: bool = Query(default=False, alias="async"), + stream: bool = Query(default=False), session: CloudUser = Depends(get_session), store: Store = Depends(get_store), ): @@ -179,6 +215,13 @@ async def suggest_recipes( req = req.model_copy(update={"level": 2}) orch_fallback = True + if stream and req.level in (3, 4): + return StreamingResponse( + _stream_recipe_sse(session.db, req), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + if req.level in (3, 4) and async_mode: return await _enqueue_recipe_job(session, req) diff --git a/app/services/recipe/llm_recipe.py b/app/services/recipe/llm_recipe.py index 76c59e1..fd5acb6 100644 --- a/app/services/recipe/llm_recipe.py +++ b/app/services/recipe/llm_recipe.py @@ -1,13 +1,14 @@ """LLM-driven recipe generator for Levels 3 and 4.""" from __future__ import annotations +import asyncio import logging import os import re from contextlib import nullcontext -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, AsyncGenerator -from openai import OpenAI +from openai import AsyncOpenAI, OpenAI if TYPE_CHECKING: from app.db.store import Store @@ -149,8 +150,8 @@ class LLMRecipeGenerator: return "\n".join(lines) - _SERVICE_TYPE = "vllm" - _MODEL_CANDIDATES = ["Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"] + _SERVICE_TYPE = "cf-text" + _MODEL_CANDIDATES = ["granite-4.1-8b", "deepseek-r1-1.5b"] _TTL_S = 300.0 _CALLER = "kiwi-recipe" @@ -182,7 +183,12 @@ class LLMRecipeGenerator: With CF_ORCH_URL set: acquires a vLLM allocation via CFOrchClient and calls the OpenAI-compatible API directly against the allocated service URL. - Allocation failure falls through to LLMRouter rather than silently returning "". + Falls back to LLMRouter when: + - Allocation succeeded but the service is cold (warm=False) — avoids + making the user wait for model load; LLMRouter uses Ollama which is + already running. + - Allocation succeeded but the connection to the service URL fails — the + agent may have registered the service but failed to start it. Without CF_ORCH_URL: uses LLMRouter directly. """ ctx = self._get_llm_context() @@ -208,6 +214,15 @@ class LLMRecipeGenerator: try: if alloc is not None: + # Skip cold services — model not yet loaded means the user would + # wait 60–120 s for model load before any response. Use LLMRouter + # (Ollama) instead, which is already warm on the host. + if not alloc.warm: + logger.info( + "cf-orch vllm allocated but cold (warm=False) — releasing and falling back to LLMRouter" + ) + raise RuntimeError("vllm cold") + base_url = alloc.url.rstrip("/") + "/v1" client = OpenAI(base_url=base_url, api_key="any") model = alloc.model or "__auto__" @@ -223,6 +238,20 @@ class LLMRecipeGenerator: return LLMRouter().complete(prompt) except Exception as exc: logger.error("LLM call failed: %s", exc) + # When cf-orch gave us an allocation but the service is unreachable + # (cold skip, connection refused, or other error), fall back to + # LLMRouter rather than silently returning empty. + # Skip "vllm" in the fallback order — that backend also routes through + # cf-orch, which would trigger a second (wasted) cold allocation. + if alloc is not None: + logger.info("Falling back to LLMRouter after vllm failure") + try: + from circuitforge_core.llm.router import LLMRouter + router = LLMRouter() + _order = [b for b in (router.config.get("fallback_order") or []) if b != "vllm"] + return router.complete(prompt, fallback_order=_order or None) + except Exception as fallback_exc: + logger.error("LLMRouter fallback also failed: %s", fallback_exc) return "" finally: if ctx is not None: @@ -359,3 +388,91 @@ class LLMRecipeGenerator: suggestions=[suggestion], element_gaps=gaps, ) + + async def stream_generate( + self, + req: RecipeRequest, + profiles: list, + gaps: list[str], + ) -> AsyncGenerator[str, None]: + """Stream LLM tokens for L3/L4. Yields raw text chunks as they arrive. + + Tries cf-orch warm vllm first; falls back to Ollama via AsyncOpenAI. + When neither is reachable, falls back to blocking _call_llm and yields + the complete response as a single chunk so the caller always gets output. + """ + if req.level == 4: + prompt = self.build_level4_prompt(req) + else: + prompt = self.build_level3_prompt(req, profiles, gaps) + + # Phase 1: try cf-orch warm vllm (sync allocation, wrapped in thread) + alloc_info = await asyncio.to_thread(self._try_alloc_for_stream) + if alloc_info is not None: + alloc, ctx = alloc_info + try: + async for token in self._stream_openai_compat( + alloc.url.rstrip("/") + "/v1", "any", alloc.model or "__auto__", prompt + ): + yield token + return + except Exception as exc: + logger.debug("cf-orch stream failed, falling back to Ollama: %s", exc) + finally: + await asyncio.to_thread(lambda: _safe_exit(ctx)) + + # Phase 2: Ollama streaming via OpenAI-compat API + from circuitforge_core.llm.router import LLMRouter + router = LLMRouter() + ollama = router.config.get("backends", {}).get("ollama") + if ollama and ollama.get("enabled", True): + base_url = ollama["base_url"] + model = ollama.get("model", "llama3") + try: + async for token in self._stream_openai_compat(base_url, "any", model, prompt): + yield token + return + except Exception as exc: + logger.warning("Ollama streaming failed, falling back to blocking: %s", exc) + + # Phase 3: blocking fallback — yields full response at once + result = await asyncio.to_thread(self._call_llm, prompt) + if result: + yield result + + def _try_alloc_for_stream(self): + """Attempt cf-orch allocation synchronously; return (alloc, ctx) or None.""" + ctx = self._get_llm_context() + try: + alloc = ctx.__enter__() + if alloc is not None and alloc.warm: + return alloc, ctx + # Not warm — release and signal fallback + _safe_exit(ctx) + except Exception as exc: + logger.debug("cf-orch alloc for stream failed: %s", exc) + return None + + @staticmethod + async def _stream_openai_compat( + base_url: str, api_key: str, model: str, prompt: str + ) -> AsyncGenerator[str, None]: + client = AsyncOpenAI(base_url=base_url, api_key=api_key) + if model == "__auto__": + models = await client.models.list() + model = models.data[0].id + stream = await client.chat.completions.create( + model=model, + messages=[{"role": "user", "content": prompt}], + stream=True, + ) + async for chunk in stream: + if chunk.choices and chunk.choices[0].delta.content: + yield chunk.choices[0].delta.content + + +def _safe_exit(ctx) -> None: + try: + ctx.__exit__(None, None, None) + except Exception: + pass diff --git a/docker/web/nginx.cloud.conf b/docker/web/nginx.cloud.conf index 85e509b..067612c 100644 --- a/docker/web/nginx.cloud.conf +++ b/docker/web/nginx.cloud.conf @@ -18,6 +18,10 @@ server { proxy_set_header X-CF-Session $http_x_cf_session; # Allow image uploads (barcode/receipt photos from phone cameras). client_max_body_size 20m; + # LLM inference (recipe suggestions, expiry fallback) can take 60-120s. + # Default proxy_read_timeout is 60s which causes 504s on full recipe generation. + proxy_read_timeout 180s; + proxy_send_timeout 180s; } # Direct-port LAN access (localhost:8515): when VITE_API_BASE='/kiwi', the frontend @@ -34,6 +38,8 @@ server { proxy_set_header X-Forwarded-Proto $http_x_forwarded_proto; proxy_set_header X-CF-Session $http_x_cf_session; client_max_body_size 20m; + proxy_read_timeout 180s; + proxy_send_timeout 180s; } # When accessed directly (localhost:8515) instead of via Caddy (/kiwi path-strip), diff --git a/frontend/src/components/RecipesView.vue b/frontend/src/components/RecipesView.vue index 04b3219..55c8cb4 100644 --- a/frontend/src/components/RecipesView.vue +++ b/frontend/src/components/RecipesView.vue @@ -246,9 +246,9 @@ No recipes containing these ingredients will appear. - +
- +
{ + try { + const data = JSON.parse(e.data) + if (data.done) { es.close(); isStreaming.value = false } + else if (data.error) { es.close(); isStreaming.value = false; streamError.value = data.error } + else if (data.chunk) { streamChunks.value += data.chunk } + } catch { /* ignore malformed events */ } + } + es.onerror = () => { es.close(); isStreaming.value = false; streamError.value = 'Stream connection lost' } return } - const url = `${tokenData.stream_url}?token=${encodeURIComponent(tokenData.token)}` - const es = new EventSource(url) - - es.onmessage = (e: MessageEvent) => { - try { - const data = JSON.parse(e.data) - if (data.done) { - es.close() - isStreaming.value = false - } else if (data.error) { - es.close() - isStreaming.value = false - streamError.value = data.error - } else if (data.chunk) { - streamChunks.value += data.chunk - } - } catch { - // ignore malformed events - } - } - - es.onerror = () => { - es.close() - isStreaming.value = false - streamError.value = 'Stream connection lost' - } + // Native SSE fallback: Kiwi backend streams directly from Ollama + await recipesStore.streamSuggest( + pantryItems.value, + secondaryPantryItems.value, + (chunk) => { streamChunks.value += chunk }, + () => { isStreaming.value = false }, + (err) => { isStreaming.value = false; streamError.value = err }, + ) } // Suggest handler diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index 5e6c981..ad20fa7 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -737,6 +737,54 @@ export const recipesAPI = { }) return response.data }, + + /** Stream a recipe via native SSE (Ollama fallback). Calls callbacks as tokens arrive. */ + async suggestRecipeStream( + req: RecipeRequest, + onChunk: (chunk: string) => void, + onDone: () => void, + onError: (err: string) => void, + ): Promise { + const baseUrl = (api.defaults.baseURL ?? '') as string + let response: Response + try { + response = await fetch(`${baseUrl}/recipes/suggest?stream=true`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(req), + }) + } catch (err: unknown) { + onError(err instanceof Error ? err.message : 'Network error') + return + } + + if (!response.ok) { + onError(`HTTP ${response.status}`) + return + } + + const reader = response.body?.getReader() + if (!reader) { onError('No response body'); return } + + const decoder = new TextDecoder() + let buffer = '' + while (true) { + const { done, value } = await reader.read() + if (done) { onDone(); break } + buffer += decoder.decode(value, { stream: true }) + const parts = buffer.split('\n\n') + buffer = parts.pop() ?? '' + for (const part of parts) { + if (!part.startsWith('data: ')) continue + try { + const data = JSON.parse(part.slice(6)) + if (data.done) { onDone(); return } + else if (data.error) { onError(data.error); return } + else if (data.chunk) { onChunk(data.chunk) } + } catch { /* ignore malformed events */ } + } + } + }, } // ========== Settings API ========== diff --git a/frontend/src/stores/recipes.ts b/frontend/src/stores/recipes.ts index de6027a..6dd83d7 100644 --- a/frontend/src/stores/recipes.ts +++ b/frontend/src/stores/recipes.ts @@ -379,6 +379,17 @@ export const useRecipesStore = defineStore('recipes', () => { wildcardConfirmed.value = false } + async function streamSuggest( + pantryItems: string[], + secondaryPantryItems: Record, + onChunk: (chunk: string) => void, + onDone: () => void, + onError: (err: string) => void, + ): Promise { + const req = _buildRequest(pantryItems, secondaryPantryItems) + await recipesAPI.suggestRecipeStream(req, onChunk, onDone, onError) + } + return { result, loading, @@ -416,6 +427,7 @@ export const useRecipesStore = defineStore('recipes', () => { missingIngredientMode, builderFilterMode, suggest, + streamSuggest, loadMore, dismiss, undismiss,