Compare commits
7 commits
79f345aae6
...
0bef082ff0
| Author | SHA1 | Date | |
|---|---|---|---|
| 0bef082ff0 | |||
| c6f45be1ba | |||
| be050f5492 | |||
| e2658f743f | |||
| dbc4aa3c68 | |||
| ed4595d960 | |||
| eba536070c |
14 changed files with 428 additions and 14 deletions
|
|
@ -51,6 +51,12 @@ ENABLE_OCR=false
|
|||
DEBUG=false
|
||||
CLOUD_MODE=false
|
||||
DEMO_MODE=false
|
||||
# Product identifier reported in cf-orch coordinator analytics for per-app breakdown
|
||||
CF_APP_NAME=kiwi
|
||||
# USE_ORCH_SCHEDULER: use coordinator-aware multi-GPU scheduler instead of local FIFO.
|
||||
# Unset = auto-detect: true if CLOUD_MODE or circuitforge_orch is installed (paid+ local).
|
||||
# Set false to force LocalScheduler even when cf-orch is present.
|
||||
# USE_ORCH_SCHEDULER=false
|
||||
|
||||
# Cloud mode (set in compose.cloud.yml; also set here for reference)
|
||||
# CLOUD_DATA_ROOT=/devl/kiwi-cloud-data
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ from app.db.store import Store
|
|||
from app.models.schemas.recipe import (
|
||||
AssemblyTemplateOut,
|
||||
BuildRequest,
|
||||
RecipeJobStatus,
|
||||
RecipeRequest,
|
||||
RecipeResult,
|
||||
RecipeSuggestion,
|
||||
|
|
@ -57,12 +58,50 @@ def _suggest_in_thread(db_path: Path, req: RecipeRequest) -> RecipeResult:
|
|||
store.close()
|
||||
|
||||
|
||||
@router.post("/suggest", response_model=RecipeResult)
|
||||
async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest):
|
||||
"""Queue an async recipe_llm job and return 202 with job_id.
|
||||
|
||||
Falls back to synchronous generation in CLOUD_MODE (scheduler polls only
|
||||
the shared settings DB, not per-user DBs — see snipe#45 / kiwi backlog).
|
||||
"""
|
||||
import json
|
||||
import uuid
|
||||
from fastapi.responses import JSONResponse
|
||||
from app.cloud_session import CLOUD_MODE
|
||||
from app.tasks.runner import insert_task
|
||||
|
||||
if CLOUD_MODE:
|
||||
log.warning("recipe_llm async jobs not supported in CLOUD_MODE — falling back to sync")
|
||||
result = await asyncio.to_thread(_suggest_in_thread, session.db, req)
|
||||
return result
|
||||
|
||||
job_id = f"rec_{uuid.uuid4().hex}"
|
||||
|
||||
def _create(db_path: Path) -> int:
|
||||
store = Store(db_path)
|
||||
try:
|
||||
row = store.create_recipe_job(job_id, session.user_id, req.model_dump_json())
|
||||
return row["id"]
|
||||
finally:
|
||||
store.close()
|
||||
|
||||
int_id = await asyncio.to_thread(_create, session.db)
|
||||
params_json = json.dumps({"job_id": job_id})
|
||||
task_id, is_new = insert_task(session.db, "recipe_llm", int_id, params=params_json)
|
||||
if is_new:
|
||||
from app.tasks.scheduler import get_scheduler
|
||||
get_scheduler(session.db).enqueue(task_id, "recipe_llm", int_id, params_json)
|
||||
|
||||
return JSONResponse(content={"job_id": job_id, "status": "queued"}, status_code=202)
|
||||
|
||||
|
||||
@router.post("/suggest")
|
||||
async def suggest_recipes(
|
||||
req: RecipeRequest,
|
||||
async_mode: bool = Query(default=False, alias="async"),
|
||||
session: CloudUser = Depends(get_session),
|
||||
store: Store = Depends(get_store),
|
||||
) -> RecipeResult:
|
||||
):
|
||||
log.info("recipes auth=%s tier=%s level=%s", _auth_label(session.user_id), session.tier, req.level)
|
||||
# Inject session-authoritative tier/byok immediately — client-supplied values are ignored.
|
||||
# Also read stored unit_system preference; default to metric if not set.
|
||||
|
|
@ -95,12 +134,49 @@ async def suggest_recipes(
|
|||
req = req.model_copy(update={"level": 2})
|
||||
orch_fallback = True
|
||||
|
||||
if req.level in (3, 4) and async_mode:
|
||||
return await _enqueue_recipe_job(session, req)
|
||||
|
||||
result = await asyncio.to_thread(_suggest_in_thread, session.db, req)
|
||||
if orch_fallback:
|
||||
result = result.model_copy(update={"orch_fallback": True})
|
||||
return result
|
||||
|
||||
|
||||
@router.get("/jobs/{job_id}", response_model=RecipeJobStatus)
|
||||
async def get_recipe_job_status(
|
||||
job_id: str,
|
||||
session: CloudUser = Depends(get_session),
|
||||
) -> RecipeJobStatus:
|
||||
"""Poll the status of an async recipe generation job.
|
||||
|
||||
Returns 404 when job_id is unknown or belongs to a different user.
|
||||
On status='done' with suggestions=[], the LLM returned empty — client
|
||||
should show a 'no recipe generated, try again' message.
|
||||
"""
|
||||
def _get(db_path: Path) -> dict | None:
|
||||
store = Store(db_path)
|
||||
try:
|
||||
return store.get_recipe_job(job_id, session.user_id)
|
||||
finally:
|
||||
store.close()
|
||||
|
||||
row = await asyncio.to_thread(_get, session.db)
|
||||
if row is None:
|
||||
raise HTTPException(status_code=404, detail="Job not found.")
|
||||
|
||||
result = None
|
||||
if row["status"] == "done" and row["result"]:
|
||||
result = RecipeResult.model_validate_json(row["result"])
|
||||
|
||||
return RecipeJobStatus(
|
||||
job_id=row["job_id"],
|
||||
status=row["status"],
|
||||
result=result,
|
||||
error=row["error"],
|
||||
)
|
||||
|
||||
|
||||
@router.get("/browse/domains")
|
||||
async def list_browse_domains(
|
||||
session: CloudUser = Depends(get_session),
|
||||
|
|
|
|||
|
|
@ -66,6 +66,13 @@ class Settings:
|
|||
|
||||
# Feature flags
|
||||
ENABLE_OCR: bool = os.environ.get("ENABLE_OCR", "false").lower() in ("1", "true", "yes")
|
||||
# Use OrchestratedScheduler (coordinator-aware, multi-GPU fan-out) instead of
|
||||
# LocalScheduler. Defaults to true in CLOUD_MODE; can be set independently
|
||||
# for multi-GPU local rigs that don't need full cloud auth.
|
||||
USE_ORCH_SCHEDULER: bool | None = (
|
||||
None if os.environ.get("USE_ORCH_SCHEDULER") is None
|
||||
else os.environ.get("USE_ORCH_SCHEDULER", "").lower() in ("1", "true", "yes")
|
||||
)
|
||||
|
||||
# Runtime
|
||||
DEBUG: bool = os.environ.get("DEBUG", "false").lower() in ("1", "true", "yes")
|
||||
|
|
|
|||
14
app/db/migrations/034_recipe_jobs.sql
Normal file
14
app/db/migrations/034_recipe_jobs.sql
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
-- Migration 034: async recipe generation job queue
|
||||
CREATE TABLE IF NOT EXISTS recipe_jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
job_id TEXT NOT NULL UNIQUE,
|
||||
user_id TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'queued',
|
||||
request TEXT NOT NULL,
|
||||
result TEXT,
|
||||
error TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_recipe_jobs_job_id ON recipe_jobs (job_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_recipe_jobs_user_id ON recipe_jobs (user_id, created_at DESC);
|
||||
|
|
@ -736,6 +736,41 @@ class Store:
|
|||
row = self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
|
||||
return row
|
||||
|
||||
# --- Async recipe jobs ---
|
||||
|
||||
def create_recipe_job(self, job_id: str, user_id: str, request_json: str) -> sqlite3.Row:
|
||||
return self._insert_returning(
|
||||
"INSERT INTO recipe_jobs (job_id, user_id, status, request) VALUES (?,?,?,?) RETURNING *",
|
||||
(job_id, user_id, "queued", request_json),
|
||||
)
|
||||
|
||||
def get_recipe_job(self, job_id: str, user_id: str) -> sqlite3.Row | None:
|
||||
return self._fetch_one(
|
||||
"SELECT * FROM recipe_jobs WHERE job_id=? AND user_id=?",
|
||||
(job_id, user_id),
|
||||
)
|
||||
|
||||
def update_recipe_job_running(self, job_id: str) -> None:
|
||||
self.conn.execute(
|
||||
"UPDATE recipe_jobs SET status='running', updated_at=datetime('now') WHERE job_id=?",
|
||||
(job_id,),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def complete_recipe_job(self, job_id: str, result_json: str) -> None:
|
||||
self.conn.execute(
|
||||
"UPDATE recipe_jobs SET status='done', result=?, updated_at=datetime('now') WHERE job_id=?",
|
||||
(result_json, job_id),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def fail_recipe_job(self, job_id: str, error: str) -> None:
|
||||
self.conn.execute(
|
||||
"UPDATE recipe_jobs SET status='failed', error=?, updated_at=datetime('now') WHERE job_id=?",
|
||||
(error, job_id),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def upsert_built_recipe(
|
||||
self,
|
||||
external_id: str,
|
||||
|
|
|
|||
|
|
@ -61,6 +61,18 @@ class RecipeResult(BaseModel):
|
|||
orch_fallback: bool = False # True when orch budget exhausted; fell back to local LLM
|
||||
|
||||
|
||||
class RecipeJobQueued(BaseModel):
|
||||
job_id: str
|
||||
status: str = "queued"
|
||||
|
||||
|
||||
class RecipeJobStatus(BaseModel):
|
||||
job_id: str
|
||||
status: str
|
||||
result: RecipeResult | None = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class NutritionFilters(BaseModel):
|
||||
"""Optional per-serving upper bounds for macro filtering. None = no filter."""
|
||||
max_calories: float | None = None
|
||||
|
|
|
|||
|
|
@ -181,6 +181,19 @@ class LLMRecipeGenerator:
|
|||
try:
|
||||
alloc = ctx.__enter__()
|
||||
except Exception as exc:
|
||||
msg = str(exc)
|
||||
# 429 = coordinator at capacity (all nodes at max_concurrent limit).
|
||||
# Don't fall back to LLMRouter — it's also overloaded and the slow
|
||||
# fallback causes nginx 504s. Return "" fast so the caller degrades
|
||||
# gracefully (empty recipe result) rather than timing out.
|
||||
if "429" in msg or "max_concurrent" in msg.lower():
|
||||
logger.info("cf-orch at capacity — returning empty result (graceful degradation)")
|
||||
if ctx is not None:
|
||||
try:
|
||||
ctx.__exit__(None, None, None)
|
||||
except Exception:
|
||||
pass
|
||||
return ""
|
||||
logger.debug("cf-orch allocation failed, falling back to LLMRouter: %s", exc)
|
||||
ctx = None # __enter__ raised — do not call __exit__
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ from app.services.expiration_predictor import ExpirationPredictor
|
|||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback"})
|
||||
LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback", "recipe_llm"})
|
||||
|
||||
VRAM_BUDGETS: dict[str, float] = {
|
||||
# ExpirationPredictor uses a small LLM (16 tokens out, single pass).
|
||||
|
|
@ -88,6 +88,8 @@ def run_task(
|
|||
try:
|
||||
if task_type == "expiry_llm_fallback":
|
||||
_run_expiry_llm_fallback(db_path, job_id, params)
|
||||
elif task_type == "recipe_llm":
|
||||
_run_recipe_llm(db_path, job_id, params)
|
||||
else:
|
||||
raise ValueError(f"Unknown kiwi task type: {task_type!r}")
|
||||
_update_task_status(db_path, task_id, "completed")
|
||||
|
|
@ -143,3 +145,41 @@ def _run_expiry_llm_fallback(
|
|||
expiry,
|
||||
days,
|
||||
)
|
||||
|
||||
|
||||
def _run_recipe_llm(db_path: Path, _job_id_int: int, params: str | None) -> None:
|
||||
"""Run LLM recipe generation for an async recipe job.
|
||||
|
||||
params JSON keys:
|
||||
job_id (required) — recipe_jobs.job_id string (e.g. "rec_a1b2c3...")
|
||||
|
||||
Creates its own Store — follows same pattern as _suggest_in_thread.
|
||||
MUST call store.fail_recipe_job() before re-raising so recipe_jobs.status
|
||||
doesn't stay 'running' while background_tasks shows 'failed'.
|
||||
"""
|
||||
from app.db.store import Store
|
||||
from app.models.schemas.recipe import RecipeRequest
|
||||
from app.services.recipe.recipe_engine import RecipeEngine
|
||||
|
||||
p = json.loads(params or "{}")
|
||||
recipe_job_id: str = p.get("job_id", "")
|
||||
if not recipe_job_id:
|
||||
raise ValueError("recipe_llm: 'job_id' is required in params")
|
||||
|
||||
store = Store(db_path)
|
||||
try:
|
||||
store.update_recipe_job_running(recipe_job_id)
|
||||
row = store._fetch_one(
|
||||
"SELECT request FROM recipe_jobs WHERE job_id=?", (recipe_job_id,)
|
||||
)
|
||||
if row is None:
|
||||
raise ValueError(f"recipe_llm: recipe_jobs row not found: {recipe_job_id!r}")
|
||||
req = RecipeRequest.model_validate_json(row["request"])
|
||||
result = RecipeEngine(store).suggest(req)
|
||||
store.complete_recipe_job(recipe_job_id, result.model_dump_json())
|
||||
log.info("recipe_llm: job %s completed (%d suggestion(s))", recipe_job_id, len(result.suggestions))
|
||||
except Exception as exc:
|
||||
store.fail_recipe_job(recipe_job_id, str(exc))
|
||||
raise
|
||||
finally:
|
||||
store.close()
|
||||
|
|
|
|||
|
|
@ -1,5 +1,10 @@
|
|||
# app/tasks/scheduler.py
|
||||
"""Kiwi LLM task scheduler — thin shim over circuitforge_core.tasks.scheduler."""
|
||||
"""Kiwi LLM task scheduler — thin shim over circuitforge_core.tasks.scheduler.
|
||||
|
||||
Local mode (CLOUD_MODE unset): LocalScheduler — simple FIFO, no coordinator.
|
||||
Cloud mode (CLOUD_MODE=true): OrchestratedScheduler — coordinator-aware, fans
|
||||
out concurrent jobs across all registered cf-orch GPU nodes.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
|
@ -7,15 +12,68 @@ from pathlib import Path
|
|||
from circuitforge_core.tasks.scheduler import (
|
||||
TaskScheduler,
|
||||
get_scheduler as _base_get_scheduler,
|
||||
reset_scheduler, # re-export for tests
|
||||
reset_scheduler as _reset_local, # re-export for tests
|
||||
)
|
||||
|
||||
from app.cloud_session import CLOUD_MODE
|
||||
from app.core.config import settings
|
||||
from app.tasks.runner import LLM_TASK_TYPES, VRAM_BUDGETS, run_task
|
||||
|
||||
|
||||
def _orch_available() -> bool:
|
||||
"""Return True if circuitforge_orch is installed in this environment."""
|
||||
try:
|
||||
import circuitforge_orch # noqa: F401
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
def _use_orch() -> bool:
|
||||
"""Return True if the OrchestratedScheduler should be used.
|
||||
|
||||
Priority order:
|
||||
1. USE_ORCH_SCHEDULER env var — explicit override always wins.
|
||||
2. CLOUD_MODE=true — use orch in managed cloud deployments.
|
||||
3. circuitforge_orch installed — paid+ local users who have cf-orch
|
||||
set up get coordinator-aware scheduling (local GPU first) automatically.
|
||||
"""
|
||||
override = settings.USE_ORCH_SCHEDULER
|
||||
if override is not None:
|
||||
return override
|
||||
return CLOUD_MODE or _orch_available()
|
||||
|
||||
|
||||
def get_scheduler(db_path: Path) -> TaskScheduler:
|
||||
"""Return the process-level TaskScheduler singleton for Kiwi."""
|
||||
"""Return the process-level TaskScheduler singleton for Kiwi.
|
||||
|
||||
OrchestratedScheduler: coordinator-aware, fans out concurrent jobs across
|
||||
all registered cf-orch GPU nodes. Active when USE_ORCH_SCHEDULER=true,
|
||||
CLOUD_MODE=true, or circuitforge_orch is installed locally (paid+ users
|
||||
running their own cf-orch stack get this automatically; local GPU is
|
||||
preferred by the coordinator's allocation queue).
|
||||
|
||||
LocalScheduler: serial FIFO, no coordinator dependency. Free-tier local
|
||||
installs without circuitforge_orch installed use this automatically.
|
||||
"""
|
||||
if _use_orch():
|
||||
try:
|
||||
from circuitforge_orch.scheduler import get_orch_scheduler
|
||||
except ImportError:
|
||||
import logging
|
||||
logging.getLogger(__name__).warning(
|
||||
"circuitforge_orch not installed — falling back to LocalScheduler"
|
||||
)
|
||||
else:
|
||||
return get_orch_scheduler(
|
||||
db_path=db_path,
|
||||
run_task_fn=run_task,
|
||||
task_types=LLM_TASK_TYPES,
|
||||
vram_budgets=VRAM_BUDGETS,
|
||||
coordinator_url=settings.COORDINATOR_URL,
|
||||
service_name="kiwi",
|
||||
)
|
||||
|
||||
return _base_get_scheduler(
|
||||
db_path=db_path,
|
||||
run_task_fn=run_task,
|
||||
|
|
@ -24,3 +82,15 @@ def get_scheduler(db_path: Path) -> TaskScheduler:
|
|||
coordinator_url=settings.COORDINATOR_URL,
|
||||
service_name="kiwi",
|
||||
)
|
||||
|
||||
|
||||
def reset_scheduler() -> None:
|
||||
"""Shut down and clear the active scheduler singleton. TEST TEARDOWN ONLY."""
|
||||
if _use_orch():
|
||||
try:
|
||||
from circuitforge_orch.scheduler import reset_orch_scheduler
|
||||
reset_orch_scheduler()
|
||||
return
|
||||
except ImportError:
|
||||
pass
|
||||
_reset_local()
|
||||
|
|
|
|||
|
|
@ -8,9 +8,9 @@ services:
|
|||
# Docker can follow the symlink inside the container.
|
||||
- /Library/Assets/kiwi:/Library/Assets/kiwi:rw
|
||||
|
||||
# cf-orch agent sidecar: registers kiwi as a GPU node with the coordinator.
|
||||
# cf-orch agent sidecar: registers this machine as GPU node "sif" with the coordinator.
|
||||
# The API scheduler uses COORDINATOR_URL to lease VRAM cooperatively; this
|
||||
# agent makes kiwi's VRAM usage visible on the orchestrator dashboard.
|
||||
# agent makes the local VRAM usage visible on the orchestrator dashboard.
|
||||
cf-orch-agent:
|
||||
image: kiwi-api # reuse local api image — cf-core already installed there
|
||||
network_mode: host
|
||||
|
|
@ -21,7 +21,7 @@ services:
|
|||
command: >
|
||||
conda run -n kiwi cf-orch agent
|
||||
--coordinator ${COORDINATOR_URL:-http://10.1.10.71:7700}
|
||||
--node-id kiwi
|
||||
--node-id sif
|
||||
--host 0.0.0.0
|
||||
--port 7702
|
||||
--advertise-host ${CF_ORCH_ADVERTISE_HOST:-10.1.10.71}
|
||||
|
|
|
|||
74
config/llm.yaml.example
Normal file
74
config/llm.yaml.example
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
# Kiwi — LLM backend configuration
|
||||
#
|
||||
# Copy to ~/.config/circuitforge/llm.yaml (shared across all CF products)
|
||||
# or to config/llm.yaml (Kiwi-local, takes precedence).
|
||||
#
|
||||
# Kiwi uses LLMs for:
|
||||
# - Expiry prediction fallback (unknown products not in the lookup table)
|
||||
# - Meal planning suggestions
|
||||
#
|
||||
# Local inference (Ollama / vLLM) is the default path — no API key required.
|
||||
# BYOK (bring your own key): set api_key_env to point at your API key env var.
|
||||
# cf-orch trunk: set CF_ORCH_URL env var to allocate cf-text on-demand via
|
||||
# the coordinator instead of hitting a static URL.
|
||||
|
||||
backends:
|
||||
ollama:
|
||||
type: openai_compat
|
||||
enabled: true
|
||||
base_url: http://localhost:11434/v1
|
||||
model: llama3.2:3b
|
||||
api_key: ollama
|
||||
supports_images: false
|
||||
|
||||
vllm:
|
||||
type: openai_compat
|
||||
enabled: false
|
||||
base_url: http://localhost:8000/v1
|
||||
model: __auto__ # resolved from /v1/models at runtime
|
||||
api_key: ''
|
||||
supports_images: false
|
||||
|
||||
# ── cf-orch trunk services ──────────────────────────────────────────────────
|
||||
# These allocate via cf-orch rather than connecting to a static URL.
|
||||
# cf-orch starts the service on-demand and returns its live URL.
|
||||
# Set CF_ORCH_URL env var or fill in url below; leave enabled: false if
|
||||
# cf-orch is not deployed in your environment.
|
||||
|
||||
cf_text:
|
||||
type: openai_compat
|
||||
enabled: false
|
||||
base_url: http://localhost:8008/v1 # fallback when cf-orch is not available
|
||||
model: __auto__
|
||||
api_key: any
|
||||
supports_images: false
|
||||
cf_orch:
|
||||
service: cf-text
|
||||
# model_candidates: leave empty to use the service's default_model,
|
||||
# or specify a catalog alias (e.g. "qwen2.5-3b").
|
||||
model_candidates: []
|
||||
ttl_s: 3600
|
||||
|
||||
# ── Cloud / BYOK ───────────────────────────────────────────────────────────
|
||||
|
||||
anthropic:
|
||||
type: anthropic
|
||||
enabled: false
|
||||
model: claude-haiku-4-5-20251001
|
||||
api_key_env: ANTHROPIC_API_KEY
|
||||
supports_images: false
|
||||
|
||||
openai:
|
||||
type: openai_compat
|
||||
enabled: false
|
||||
base_url: https://api.openai.com/v1
|
||||
model: gpt-4o-mini
|
||||
api_key_env: OPENAI_API_KEY
|
||||
supports_images: false
|
||||
|
||||
fallback_order:
|
||||
- cf_text
|
||||
- ollama
|
||||
- vllm
|
||||
- anthropic
|
||||
- openai
|
||||
|
|
@ -287,7 +287,10 @@
|
|||
@click="handleSuggest"
|
||||
>
|
||||
<span v-if="recipesStore.loading && !isLoadingMore">
|
||||
<span class="spinner spinner-sm inline-spinner"></span> Finding recipes…
|
||||
<span class="spinner spinner-sm inline-spinner"></span>
|
||||
<span v-if="recipesStore.jobStatus === 'queued'">Queued…</span>
|
||||
<span v-else-if="recipesStore.jobStatus === 'running'">Generating…</span>
|
||||
<span v-else>Finding recipes…</span>
|
||||
</span>
|
||||
<span v-else>Suggest Recipes</span>
|
||||
</button>
|
||||
|
|
@ -312,7 +315,9 @@
|
|||
|
||||
<!-- Screen reader announcement for loading + results -->
|
||||
<div aria-live="polite" aria-atomic="true" class="sr-only">
|
||||
<span v-if="recipesStore.loading">Finding recipes…</span>
|
||||
<span v-if="recipesStore.loading && recipesStore.jobStatus === 'queued'">Recipe request queued, waiting for model…</span>
|
||||
<span v-else-if="recipesStore.loading && recipesStore.jobStatus === 'running'">Generating your recipe now…</span>
|
||||
<span v-else-if="recipesStore.loading">Finding recipes…</span>
|
||||
<span v-else-if="recipesStore.result">
|
||||
{{ filteredSuggestions.length }} recipe{{ filteredSuggestions.length !== 1 ? 's' : '' }} found
|
||||
</span>
|
||||
|
|
|
|||
|
|
@ -524,6 +524,15 @@ export interface RecipeResult {
|
|||
rate_limit_count: number
|
||||
}
|
||||
|
||||
export type RecipeJobStatusValue = 'queued' | 'running' | 'done' | 'failed'
|
||||
|
||||
export interface RecipeJobStatus {
|
||||
job_id: string
|
||||
status: RecipeJobStatusValue
|
||||
result: RecipeResult | null
|
||||
error: string | null
|
||||
}
|
||||
|
||||
export interface RecipeRequest {
|
||||
pantry_items: string[]
|
||||
secondary_pantry_items: Record<string, string>
|
||||
|
|
@ -593,6 +602,18 @@ export const recipesAPI = {
|
|||
const response = await api.post('/recipes/suggest', req, { timeout: 120000 })
|
||||
return response.data
|
||||
},
|
||||
|
||||
/** Submit an async job for L3/L4 generation. Returns job_id + initial status. */
|
||||
async suggestAsync(req: RecipeRequest): Promise<{ job_id: string; status: string }> {
|
||||
const response = await api.post('/recipes/suggest', req, { params: { async: 'true' }, timeout: 15000 })
|
||||
return response.data
|
||||
},
|
||||
|
||||
/** Poll an async job. Returns the full status including result once done. */
|
||||
async pollJob(jobId: string): Promise<RecipeJobStatus> {
|
||||
const response = await api.get(`/recipes/jobs/${jobId}`, { timeout: 10000 })
|
||||
return response.data
|
||||
},
|
||||
async getRecipe(id: number): Promise<RecipeSuggestion> {
|
||||
const response = await api.get(`/recipes/${id}`)
|
||||
return response.data
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
import { defineStore } from 'pinia'
|
||||
import { ref, computed, watch } from 'vue'
|
||||
import { recipesAPI, type RecipeResult, type RecipeSuggestion, type RecipeRequest, type NutritionFilters } from '../services/api'
|
||||
import { recipesAPI, type RecipeResult, type RecipeSuggestion, type RecipeRequest, type RecipeJobStatusValue, type NutritionFilters } from '../services/api'
|
||||
|
||||
const DISMISSED_KEY = 'kiwi:dismissed_recipes'
|
||||
const DISMISS_TTL_MS = 7 * 24 * 60 * 60 * 1000
|
||||
|
|
@ -121,6 +121,7 @@ export const useRecipesStore = defineStore('recipes', () => {
|
|||
const result = ref<RecipeResult | null>(null)
|
||||
const loading = ref(false)
|
||||
const error = ref<string | null>(null)
|
||||
const jobStatus = ref<RecipeJobStatusValue | null>(null)
|
||||
|
||||
// Request parameters
|
||||
const level = ref(1)
|
||||
|
|
@ -199,18 +200,57 @@ export const useRecipesStore = defineStore('recipes', () => {
|
|||
async function suggest(pantryItems: string[], secondaryPantryItems: Record<string, string> = {}) {
|
||||
loading.value = true
|
||||
error.value = null
|
||||
jobStatus.value = null
|
||||
seenIds.value = new Set()
|
||||
|
||||
try {
|
||||
result.value = await recipesAPI.suggest(_buildRequest(pantryItems, secondaryPantryItems))
|
||||
_trackSeen(result.value.suggestions)
|
||||
if (level.value >= 3) {
|
||||
await _suggestAsync(pantryItems, secondaryPantryItems)
|
||||
} else {
|
||||
result.value = await recipesAPI.suggest(_buildRequest(pantryItems, secondaryPantryItems))
|
||||
_trackSeen(result.value.suggestions)
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
error.value = err instanceof Error ? err.message : 'Failed to get recipe suggestions'
|
||||
} finally {
|
||||
loading.value = false
|
||||
jobStatus.value = null
|
||||
}
|
||||
}
|
||||
|
||||
async function _suggestAsync(pantryItems: string[], secondaryPantryItems: Record<string, string>) {
|
||||
const queued = await recipesAPI.suggestAsync(_buildRequest(pantryItems, secondaryPantryItems))
|
||||
|
||||
// CLOUD_MODE or future sync fallback: server returned result directly (status 200)
|
||||
if ('suggestions' in queued) {
|
||||
result.value = queued as unknown as RecipeResult
|
||||
_trackSeen(result.value.suggestions)
|
||||
return
|
||||
}
|
||||
|
||||
jobStatus.value = 'queued'
|
||||
const { job_id } = queued
|
||||
const deadline = Date.now() + 90_000
|
||||
const POLL_MS = 2_500
|
||||
|
||||
while (Date.now() < deadline) {
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_MS))
|
||||
const poll = await recipesAPI.pollJob(job_id)
|
||||
jobStatus.value = poll.status
|
||||
|
||||
if (poll.status === 'done') {
|
||||
result.value = poll.result
|
||||
if (result.value) _trackSeen(result.value.suggestions)
|
||||
return
|
||||
}
|
||||
if (poll.status === 'failed') {
|
||||
throw new Error(poll.error ?? 'Recipe generation failed')
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error('Recipe generation timed out — the model may be busy. Try again.')
|
||||
}
|
||||
|
||||
async function loadMore(pantryItems: string[], secondaryPantryItems: Record<string, string> = {}) {
|
||||
if (!result.value || loading.value) return
|
||||
loading.value = true
|
||||
|
|
@ -308,6 +348,7 @@ export const useRecipesStore = defineStore('recipes', () => {
|
|||
result,
|
||||
loading,
|
||||
error,
|
||||
jobStatus,
|
||||
level,
|
||||
constraints,
|
||||
allergies,
|
||||
|
|
|
|||
Loading…
Reference in a new issue