diff --git a/app/api/endpoints/recipe_scan.py b/app/api/endpoints/recipe_scan.py index 7fa1515..6f6e5e5 100644 --- a/app/api/endpoints/recipe_scan.py +++ b/app/api/endpoints/recipe_scan.py @@ -11,6 +11,7 @@ BSL 1.1 -- recipe_scan requires Paid tier or BYOK. from __future__ import annotations import asyncio +import json as _json import logging import uuid from pathlib import Path @@ -18,7 +19,7 @@ from typing import Annotated import aiofiles from fastapi import APIRouter, Depends, File, HTTPException, UploadFile -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, StreamingResponse from app.cloud_session import CloudUser, get_session from app.core.config import settings @@ -168,9 +169,15 @@ async def scan_recipe( ) raise HTTPException(status_code=422, detail=msg) except RuntimeError as exc: + msg = str(exc) + logger.warning("Recipe scanner unavailable: %s", msg) raise HTTPException( status_code=503, - detail=str(exc), + detail=( + "The recipe scanner is temporarily unavailable — " + "no vision backend could be reached. " + "Try again in a few minutes, or contact support if this persists." + ), ) return _result_to_response(result) @@ -184,6 +191,114 @@ async def scan_recipe( pass +# ── SSE scan endpoint ───────────────────────────────────────────────────────── + +async def _scan_recipe_sse(saved_paths: list[Path], pantry_names: list[str]): + """Async generator yielding SSE events for a recipe scan. + + Emits progress events while the vision service allocates and runs, then a + final "done" event containing the full recipe payload (same shape as the + ScannedRecipeResponse from POST /scan). + + Events: + {"status": "allocating", "message": "..."} + {"status": "scanning", "message": "..."} + {"status": "structuring","message": "..."} + {"status": "done", "recipe": {...}} + {"status": "error", "message": "..."} + """ + queue: asyncio.Queue = asyncio.Queue() + loop = asyncio.get_running_loop() + + def _run() -> None: + def cb(status: str, message: str) -> None: + loop.call_soon_threadsafe(queue.put_nowait, {"status": status, "message": message}) + try: + from app.services.recipe.recipe_scanner import RecipeScanner + result = RecipeScanner().scan(saved_paths, pantry_names=pantry_names, progress_cb=cb) + recipe_dict = _result_to_response(result).model_dump() + loop.call_soon_threadsafe(queue.put_nowait, {"status": "done", "recipe": recipe_dict}) + except ValueError as exc: + loop.call_soon_threadsafe(queue.put_nowait, {"status": "error", "message": str(exc)}) + except RuntimeError as exc: + loop.call_soon_threadsafe(queue.put_nowait, {"status": "error", "message": str(exc)}) + except Exception as exc: + logger.exception("Unexpected error in recipe scan thread") + loop.call_soon_threadsafe(queue.put_nowait, {"status": "error", "message": "Scan failed unexpectedly."}) + + scan_task = asyncio.ensure_future(asyncio.to_thread(_run)) + try: + while True: + try: + event = await asyncio.wait_for(queue.get(), timeout=180.0) + except asyncio.TimeoutError: + yield f"data: {_json.dumps({'status': 'error', 'message': 'Scan timed out after 3 minutes.'})}\n\n" + break + yield f"data: {_json.dumps(event)}\n\n" + if event["status"] in ("done", "error"): + break + finally: + if not scan_task.done(): + scan_task.cancel() + + +@router.post("/scan/stream") +async def scan_recipe_stream( + files: Annotated[list[UploadFile], File(...)], + store: Store = Depends(get_store), + session: CloudUser = Depends(get_session), +): + """Scan recipe photos and stream SSE progress events during model load. + + Use this endpoint instead of POST /scan when you need live feedback during + cold-start model loading (first request after a GPU-idle period can take + 30-60 seconds for cf-docuvision to warm up). + + Tier: Paid (or BYOK) — same gate as POST /scan. + """ + if not can_use("recipe_scan", session.tier, session.has_byok): + raise HTTPException( + status_code=403, + detail=( + "Recipe scanning requires Paid tier or a configured vision backend (BYOK). " + "Set ANTHROPIC_API_KEY or connect to a cf-orch vision service." + ), + ) + + if not files: + raise HTTPException(status_code=422, detail="At least one image file is required.") + if len(files) > 4: + raise HTTPException(status_code=422, detail="Maximum 4 images per scan request.") + + for f in files: + ct = (f.content_type or "").lower() + if ct and ct not in _ALLOWED_MIME_TYPES: + raise HTTPException( + status_code=422, + detail=f"Unsupported file type: {ct}. Supported: JPEG, PNG, WebP, HEIC.", + ) + + saved_paths: list[Path] = [] + for f in files: + saved_paths.append(await _save_upload_temp(f)) + + inventory = await asyncio.to_thread(store.list_inventory) + pantry_names = [item["product_name"] for item in inventory if item.get("product_name")] + + async def generate(): + try: + async for chunk in _scan_recipe_sse(saved_paths, pantry_names): + yield chunk + finally: + for p in saved_paths: + try: + p.unlink(missing_ok=True) + except Exception: + pass + + return StreamingResponse(generate(), media_type="text/event-stream") + + # ── Save endpoint ────────────────────────────────────────────────────────────── @router.post("/scan/save", response_model=UserRecipeResponse, status_code=201) diff --git a/app/services/recipe/recipe_scanner.py b/app/services/recipe/recipe_scanner.py index fde512a..a1a3f7a 100644 --- a/app/services/recipe/recipe_scanner.py +++ b/app/services/recipe/recipe_scanner.py @@ -21,6 +21,7 @@ import json import logging import os import re +from collections.abc import Callable from dataclasses import dataclass from pathlib import Path @@ -214,12 +215,26 @@ def _build_ocr_extraction_prompt(ocr_text: str) -> str: ) -def _call_vision_backend(image_paths: list[Path], prompt: str) -> str: +def _call_vision_backend( + image_paths: list[Path], + prompt: str, + progress_cb: "Callable[[str, str], None] | None" = None, +) -> str: """Dispatch to the best available vision backend. Priority: cf-orch docuvision (OCR + text LLM) -> local Qwen2.5-VL -> Anthropic API. Raises RuntimeError with a clear message when no backend is available. + + Args: + image_paths: Images to process. + prompt: Extraction prompt (used by local VLM / Anthropic paths). + progress_cb: Optional callback(status, message) for SSE progress events. + Called synchronously from the thread — caller bridges to async. """ + def _progress(status: str, message: str) -> None: + if progress_cb: + progress_cb(status, message) + errors: list[str] = [] # 1. Try cf-orch task allocation → cf-docuvision OCR, then text LLM structuring. @@ -233,8 +248,10 @@ def _call_vision_backend(image_paths: list[Path], prompt: str) -> str: from circuitforge_core.llm.router import LLMRouter try: + _progress("allocating", "Starting vision service...") with task_allocate("kiwi", "recipe_scan", service_hint="cf-docuvision", ttl_s=120.0) as alloc: # Step 1: OCR each image via cf-docuvision + _progress("scanning", "Extracting recipe text from photo...") doc_client = DocuvisionClient(alloc.url) ocr_parts: list[str] = [] for i, path in enumerate(image_paths): @@ -247,6 +264,7 @@ def _call_vision_backend(image_paths: list[Path], prompt: str) -> str: raise ValueError("Docuvision returned no text — image may not be a recipe") # Step 2: Text LLM structures OCR output into recipe JSON + _progress("structuring", "Parsing recipe structure...") text = LLMRouter().complete(_build_ocr_extraction_prompt(combined_ocr)) if text: return text @@ -379,6 +397,7 @@ class RecipeScanner: self, image_paths: list[Path], pantry_names: list[str] | None = None, + progress_cb: Callable[[str, str], None] | None = None, ) -> ScannedRecipeResult: """Extract a structured recipe from one or more photos. @@ -400,7 +419,7 @@ class RecipeScanner: raise ValueError(f"Maximum {MAX_IMAGES} images per scan (got {len(image_paths)})") # Call vision backend - raw_text = _call_vision_backend(image_paths, _EXTRACTION_PROMPT) + raw_text = _call_vision_backend(image_paths, _EXTRACTION_PROMPT, progress_cb=progress_cb) # Parse JSON from VLM output data = _parse_scanner_json(raw_text) diff --git a/frontend/src/components/RecipeScanModal.vue b/frontend/src/components/RecipeScanModal.vue index 77f8b7a..26004cf 100644 --- a/frontend/src/components/RecipeScanModal.vue +++ b/frontend/src/components/RecipeScanModal.vue @@ -112,8 +112,8 @@ -

Extracting recipe from {{ selectedFiles.length > 1 ? selectedFiles.length + ' photos' : 'photo' }}...

-

This can take 10-30 seconds.

+

{{ scanStatusMessage }}

+

This can take up to a minute on first use.

@@ -329,13 +329,18 @@ function removeFile(index: number) { // ── Scan ────────────────────────────────────────────────────────────────────── const extracted = ref(null) +const scanStatusMessage = ref('Uploading photos...') async function startScan() { if (selectedFiles.value.length === 0) return uploadError.value = '' + scanStatusMessage.value = 'Uploading photos...' phase.value = 'processing' try { - const result = await recipeScanAPI.scan(selectedFiles.value) + const result = await recipeScanAPI.scanStream( + selectedFiles.value, + (_status: string, message: string) => { scanStatusMessage.value = message }, + ) extracted.value = result initEditState(result) phase.value = 'review' diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index f618d95..e95bbb7 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -1326,6 +1326,56 @@ export const recipeScanAPI = { }).then((r) => r.data) }, + /** Scan recipe photos with live SSE progress events. + * + * Calls onProgress(status, message) for each intermediate event + * ("allocating", "scanning", "structuring"), then resolves with the final + * ScannedRecipe on success. Rejects on error or timeout. + */ + async scanStream( + files: File[], + onProgress: (status: string, message: string) => void, + ): Promise { + const form = new FormData() + files.forEach((f) => form.append('files', f)) + + const response = await fetch(`${API_BASE_URL}/recipes/scan/stream`, { + method: 'POST', + body: form, + }) + + if (!response.ok || !response.body) { + let detail = '' + try { detail = await response.text() } catch (_) { /* ignore */ } + throw new Error(detail || `Scan failed (${response.status})`) + } + + const reader = response.body.getReader() + const decoder = new TextDecoder() + let buffer = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + + for (const line of lines) { + if (!line.startsWith('data: ')) continue + let data: Record + try { data = JSON.parse(line.slice(6)) } catch { continue } + + if (data.status === 'done') return data.recipe as ScannedRecipe + if (data.status === 'error') throw new Error((data.message as string) || 'Scan failed') + onProgress(data.status as string, data.message as string) + } + } + + throw new Error('Stream ended without a result') + }, + /** Save a reviewed/edited scanned recipe to user_recipes. */ saveScanned(recipe: Omit & { source?: string }): Promise { return api.post('/recipes/scan/save', recipe).then((r) => r.data)