feat(recipe-scan): add SSE streaming endpoint for cold-start progress feedback
Some checks failed
CI / Frontend (Vue) (push) Waiting to run
CI / Backend (Python) (push) Waiting to run
Mirror / mirror (push) Has been cancelled
Release / release (push) Has been cancelled

POST /recipes/scan/stream emits live status events while cf-docuvision
allocates and processes, replacing the static spinner with phase-aware labels:
  allocating -> scanning -> structuring -> done|error

Uses asyncio.Queue bridge to route progress callbacks from the sync scanner
thread to the async SSE generator. Frontend updated to consume the stream via
fetch + ReadableStream (EventSource does not support POST multipart).

Closes kiwi#136 (companion to the docuvision routing fix).
This commit is contained in:
pyr0ball 2026-05-16 16:24:32 -07:00
parent 4ac24e7920
commit 2df17ec719
4 changed files with 196 additions and 7 deletions

View file

@ -11,6 +11,7 @@ BSL 1.1 -- recipe_scan requires Paid tier or BYOK.
from __future__ import annotations
import asyncio
import json as _json
import logging
import uuid
from pathlib import Path
@ -18,7 +19,7 @@ from typing import Annotated
import aiofiles
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse
from app.cloud_session import CloudUser, get_session
from app.core.config import settings
@ -168,9 +169,15 @@ async def scan_recipe(
)
raise HTTPException(status_code=422, detail=msg)
except RuntimeError as exc:
msg = str(exc)
logger.warning("Recipe scanner unavailable: %s", msg)
raise HTTPException(
status_code=503,
detail=str(exc),
detail=(
"The recipe scanner is temporarily unavailable — "
"no vision backend could be reached. "
"Try again in a few minutes, or contact support if this persists."
),
)
return _result_to_response(result)
@ -184,6 +191,114 @@ async def scan_recipe(
pass
# ── SSE scan endpoint ─────────────────────────────────────────────────────────
async def _scan_recipe_sse(saved_paths: list[Path], pantry_names: list[str]):
"""Async generator yielding SSE events for a recipe scan.
Emits progress events while the vision service allocates and runs, then a
final "done" event containing the full recipe payload (same shape as the
ScannedRecipeResponse from POST /scan).
Events:
{"status": "allocating", "message": "..."}
{"status": "scanning", "message": "..."}
{"status": "structuring","message": "..."}
{"status": "done", "recipe": {...}}
{"status": "error", "message": "..."}
"""
queue: asyncio.Queue = asyncio.Queue()
loop = asyncio.get_running_loop()
def _run() -> None:
def cb(status: str, message: str) -> None:
loop.call_soon_threadsafe(queue.put_nowait, {"status": status, "message": message})
try:
from app.services.recipe.recipe_scanner import RecipeScanner
result = RecipeScanner().scan(saved_paths, pantry_names=pantry_names, progress_cb=cb)
recipe_dict = _result_to_response(result).model_dump()
loop.call_soon_threadsafe(queue.put_nowait, {"status": "done", "recipe": recipe_dict})
except ValueError as exc:
loop.call_soon_threadsafe(queue.put_nowait, {"status": "error", "message": str(exc)})
except RuntimeError as exc:
loop.call_soon_threadsafe(queue.put_nowait, {"status": "error", "message": str(exc)})
except Exception as exc:
logger.exception("Unexpected error in recipe scan thread")
loop.call_soon_threadsafe(queue.put_nowait, {"status": "error", "message": "Scan failed unexpectedly."})
scan_task = asyncio.ensure_future(asyncio.to_thread(_run))
try:
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=180.0)
except asyncio.TimeoutError:
yield f"data: {_json.dumps({'status': 'error', 'message': 'Scan timed out after 3 minutes.'})}\n\n"
break
yield f"data: {_json.dumps(event)}\n\n"
if event["status"] in ("done", "error"):
break
finally:
if not scan_task.done():
scan_task.cancel()
@router.post("/scan/stream")
async def scan_recipe_stream(
files: Annotated[list[UploadFile], File(...)],
store: Store = Depends(get_store),
session: CloudUser = Depends(get_session),
):
"""Scan recipe photos and stream SSE progress events during model load.
Use this endpoint instead of POST /scan when you need live feedback during
cold-start model loading (first request after a GPU-idle period can take
30-60 seconds for cf-docuvision to warm up).
Tier: Paid (or BYOK) same gate as POST /scan.
"""
if not can_use("recipe_scan", session.tier, session.has_byok):
raise HTTPException(
status_code=403,
detail=(
"Recipe scanning requires Paid tier or a configured vision backend (BYOK). "
"Set ANTHROPIC_API_KEY or connect to a cf-orch vision service."
),
)
if not files:
raise HTTPException(status_code=422, detail="At least one image file is required.")
if len(files) > 4:
raise HTTPException(status_code=422, detail="Maximum 4 images per scan request.")
for f in files:
ct = (f.content_type or "").lower()
if ct and ct not in _ALLOWED_MIME_TYPES:
raise HTTPException(
status_code=422,
detail=f"Unsupported file type: {ct}. Supported: JPEG, PNG, WebP, HEIC.",
)
saved_paths: list[Path] = []
for f in files:
saved_paths.append(await _save_upload_temp(f))
inventory = await asyncio.to_thread(store.list_inventory)
pantry_names = [item["product_name"] for item in inventory if item.get("product_name")]
async def generate():
try:
async for chunk in _scan_recipe_sse(saved_paths, pantry_names):
yield chunk
finally:
for p in saved_paths:
try:
p.unlink(missing_ok=True)
except Exception:
pass
return StreamingResponse(generate(), media_type="text/event-stream")
# ── Save endpoint ──────────────────────────────────────────────────────────────
@router.post("/scan/save", response_model=UserRecipeResponse, status_code=201)

View file

@ -21,6 +21,7 @@ import json
import logging
import os
import re
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
@ -214,12 +215,26 @@ def _build_ocr_extraction_prompt(ocr_text: str) -> str:
)
def _call_vision_backend(image_paths: list[Path], prompt: str) -> str:
def _call_vision_backend(
image_paths: list[Path],
prompt: str,
progress_cb: "Callable[[str, str], None] | None" = None,
) -> str:
"""Dispatch to the best available vision backend.
Priority: cf-orch docuvision (OCR + text LLM) -> local Qwen2.5-VL -> Anthropic API.
Raises RuntimeError with a clear message when no backend is available.
Args:
image_paths: Images to process.
prompt: Extraction prompt (used by local VLM / Anthropic paths).
progress_cb: Optional callback(status, message) for SSE progress events.
Called synchronously from the thread caller bridges to async.
"""
def _progress(status: str, message: str) -> None:
if progress_cb:
progress_cb(status, message)
errors: list[str] = []
# 1. Try cf-orch task allocation → cf-docuvision OCR, then text LLM structuring.
@ -233,8 +248,10 @@ def _call_vision_backend(image_paths: list[Path], prompt: str) -> str:
from circuitforge_core.llm.router import LLMRouter
try:
_progress("allocating", "Starting vision service...")
with task_allocate("kiwi", "recipe_scan", service_hint="cf-docuvision", ttl_s=120.0) as alloc:
# Step 1: OCR each image via cf-docuvision
_progress("scanning", "Extracting recipe text from photo...")
doc_client = DocuvisionClient(alloc.url)
ocr_parts: list[str] = []
for i, path in enumerate(image_paths):
@ -247,6 +264,7 @@ def _call_vision_backend(image_paths: list[Path], prompt: str) -> str:
raise ValueError("Docuvision returned no text — image may not be a recipe")
# Step 2: Text LLM structures OCR output into recipe JSON
_progress("structuring", "Parsing recipe structure...")
text = LLMRouter().complete(_build_ocr_extraction_prompt(combined_ocr))
if text:
return text
@ -379,6 +397,7 @@ class RecipeScanner:
self,
image_paths: list[Path],
pantry_names: list[str] | None = None,
progress_cb: Callable[[str, str], None] | None = None,
) -> ScannedRecipeResult:
"""Extract a structured recipe from one or more photos.
@ -400,7 +419,7 @@ class RecipeScanner:
raise ValueError(f"Maximum {MAX_IMAGES} images per scan (got {len(image_paths)})")
# Call vision backend
raw_text = _call_vision_backend(image_paths, _EXTRACTION_PROMPT)
raw_text = _call_vision_backend(image_paths, _EXTRACTION_PROMPT, progress_cb=progress_cb)
# Parse JSON from VLM output
data = _parse_scanner_json(raw_text)

View file

@ -112,8 +112,8 @@
<path d="M23 19a2 2 0 0 1-2 2H3a2 2 0 0 1-2-2V8a2 2 0 0 1 2-2h4l2-3h6l2 3h4a2 2 0 0 1 2 2z"/>
<circle cx="12" cy="13" r="4"/>
</svg>
<p class="processing-label">Extracting recipe from {{ selectedFiles.length > 1 ? selectedFiles.length + ' photos' : 'photo' }}...</p>
<p class="processing-sub">This can take 10-30 seconds.</p>
<p class="processing-label">{{ scanStatusMessage }}</p>
<p class="processing-sub">This can take up to a minute on first use.</p>
</div>
</div>
@ -329,13 +329,18 @@ function removeFile(index: number) {
// Scan
const extracted = ref<ScannedRecipe | null>(null)
const scanStatusMessage = ref('Uploading photos...')
async function startScan() {
if (selectedFiles.value.length === 0) return
uploadError.value = ''
scanStatusMessage.value = 'Uploading photos...'
phase.value = 'processing'
try {
const result = await recipeScanAPI.scan(selectedFiles.value)
const result = await recipeScanAPI.scanStream(
selectedFiles.value,
(_status: string, message: string) => { scanStatusMessage.value = message },
)
extracted.value = result
initEditState(result)
phase.value = 'review'

View file

@ -1326,6 +1326,56 @@ export const recipeScanAPI = {
}).then((r) => r.data)
},
/** Scan recipe photos with live SSE progress events.
*
* Calls onProgress(status, message) for each intermediate event
* ("allocating", "scanning", "structuring"), then resolves with the final
* ScannedRecipe on success. Rejects on error or timeout.
*/
async scanStream(
files: File[],
onProgress: (status: string, message: string) => void,
): Promise<ScannedRecipe> {
const form = new FormData()
files.forEach((f) => form.append('files', f))
const response = await fetch(`${API_BASE_URL}/recipes/scan/stream`, {
method: 'POST',
body: form,
})
if (!response.ok || !response.body) {
let detail = ''
try { detail = await response.text() } catch (_) { /* ignore */ }
throw new Error(detail || `Scan failed (${response.status})`)
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
if (!line.startsWith('data: ')) continue
let data: Record<string, unknown>
try { data = JSON.parse(line.slice(6)) } catch { continue }
if (data.status === 'done') return data.recipe as ScannedRecipe
if (data.status === 'error') throw new Error((data.message as string) || 'Scan failed')
onProgress(data.status as string, data.message as string)
}
}
throw new Error('Stream ended without a result')
},
/** Save a reviewed/edited scanned recipe to user_recipes. */
saveScanned(recipe: Omit<ScannedRecipe, 'pantry_match_pct' | 'confidence' | 'warnings'> & { source?: string }): Promise<UserRecipe> {
return api.post('/recipes/scan/save', recipe).then((r) => r.data)