Compare commits
7 commits
79f345aae6
...
0bef082ff0
| Author | SHA1 | Date | |
|---|---|---|---|
| 0bef082ff0 | |||
| c6f45be1ba | |||
| be050f5492 | |||
| e2658f743f | |||
| dbc4aa3c68 | |||
| ed4595d960 | |||
| eba536070c |
14 changed files with 428 additions and 14 deletions
|
|
@ -51,6 +51,12 @@ ENABLE_OCR=false
|
||||||
DEBUG=false
|
DEBUG=false
|
||||||
CLOUD_MODE=false
|
CLOUD_MODE=false
|
||||||
DEMO_MODE=false
|
DEMO_MODE=false
|
||||||
|
# Product identifier reported in cf-orch coordinator analytics for per-app breakdown
|
||||||
|
CF_APP_NAME=kiwi
|
||||||
|
# USE_ORCH_SCHEDULER: use coordinator-aware multi-GPU scheduler instead of local FIFO.
|
||||||
|
# Unset = auto-detect: true if CLOUD_MODE or circuitforge_orch is installed (paid+ local).
|
||||||
|
# Set false to force LocalScheduler even when cf-orch is present.
|
||||||
|
# USE_ORCH_SCHEDULER=false
|
||||||
|
|
||||||
# Cloud mode (set in compose.cloud.yml; also set here for reference)
|
# Cloud mode (set in compose.cloud.yml; also set here for reference)
|
||||||
# CLOUD_DATA_ROOT=/devl/kiwi-cloud-data
|
# CLOUD_DATA_ROOT=/devl/kiwi-cloud-data
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ from app.db.store import Store
|
||||||
from app.models.schemas.recipe import (
|
from app.models.schemas.recipe import (
|
||||||
AssemblyTemplateOut,
|
AssemblyTemplateOut,
|
||||||
BuildRequest,
|
BuildRequest,
|
||||||
|
RecipeJobStatus,
|
||||||
RecipeRequest,
|
RecipeRequest,
|
||||||
RecipeResult,
|
RecipeResult,
|
||||||
RecipeSuggestion,
|
RecipeSuggestion,
|
||||||
|
|
@ -57,12 +58,50 @@ def _suggest_in_thread(db_path: Path, req: RecipeRequest) -> RecipeResult:
|
||||||
store.close()
|
store.close()
|
||||||
|
|
||||||
|
|
||||||
@router.post("/suggest", response_model=RecipeResult)
|
async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest):
|
||||||
|
"""Queue an async recipe_llm job and return 202 with job_id.
|
||||||
|
|
||||||
|
Falls back to synchronous generation in CLOUD_MODE (scheduler polls only
|
||||||
|
the shared settings DB, not per-user DBs — see snipe#45 / kiwi backlog).
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
from fastapi.responses import JSONResponse
|
||||||
|
from app.cloud_session import CLOUD_MODE
|
||||||
|
from app.tasks.runner import insert_task
|
||||||
|
|
||||||
|
if CLOUD_MODE:
|
||||||
|
log.warning("recipe_llm async jobs not supported in CLOUD_MODE — falling back to sync")
|
||||||
|
result = await asyncio.to_thread(_suggest_in_thread, session.db, req)
|
||||||
|
return result
|
||||||
|
|
||||||
|
job_id = f"rec_{uuid.uuid4().hex}"
|
||||||
|
|
||||||
|
def _create(db_path: Path) -> int:
|
||||||
|
store = Store(db_path)
|
||||||
|
try:
|
||||||
|
row = store.create_recipe_job(job_id, session.user_id, req.model_dump_json())
|
||||||
|
return row["id"]
|
||||||
|
finally:
|
||||||
|
store.close()
|
||||||
|
|
||||||
|
int_id = await asyncio.to_thread(_create, session.db)
|
||||||
|
params_json = json.dumps({"job_id": job_id})
|
||||||
|
task_id, is_new = insert_task(session.db, "recipe_llm", int_id, params=params_json)
|
||||||
|
if is_new:
|
||||||
|
from app.tasks.scheduler import get_scheduler
|
||||||
|
get_scheduler(session.db).enqueue(task_id, "recipe_llm", int_id, params_json)
|
||||||
|
|
||||||
|
return JSONResponse(content={"job_id": job_id, "status": "queued"}, status_code=202)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/suggest")
|
||||||
async def suggest_recipes(
|
async def suggest_recipes(
|
||||||
req: RecipeRequest,
|
req: RecipeRequest,
|
||||||
|
async_mode: bool = Query(default=False, alias="async"),
|
||||||
session: CloudUser = Depends(get_session),
|
session: CloudUser = Depends(get_session),
|
||||||
store: Store = Depends(get_store),
|
store: Store = Depends(get_store),
|
||||||
) -> RecipeResult:
|
):
|
||||||
log.info("recipes auth=%s tier=%s level=%s", _auth_label(session.user_id), session.tier, req.level)
|
log.info("recipes auth=%s tier=%s level=%s", _auth_label(session.user_id), session.tier, req.level)
|
||||||
# Inject session-authoritative tier/byok immediately — client-supplied values are ignored.
|
# Inject session-authoritative tier/byok immediately — client-supplied values are ignored.
|
||||||
# Also read stored unit_system preference; default to metric if not set.
|
# Also read stored unit_system preference; default to metric if not set.
|
||||||
|
|
@ -95,12 +134,49 @@ async def suggest_recipes(
|
||||||
req = req.model_copy(update={"level": 2})
|
req = req.model_copy(update={"level": 2})
|
||||||
orch_fallback = True
|
orch_fallback = True
|
||||||
|
|
||||||
|
if req.level in (3, 4) and async_mode:
|
||||||
|
return await _enqueue_recipe_job(session, req)
|
||||||
|
|
||||||
result = await asyncio.to_thread(_suggest_in_thread, session.db, req)
|
result = await asyncio.to_thread(_suggest_in_thread, session.db, req)
|
||||||
if orch_fallback:
|
if orch_fallback:
|
||||||
result = result.model_copy(update={"orch_fallback": True})
|
result = result.model_copy(update={"orch_fallback": True})
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/jobs/{job_id}", response_model=RecipeJobStatus)
|
||||||
|
async def get_recipe_job_status(
|
||||||
|
job_id: str,
|
||||||
|
session: CloudUser = Depends(get_session),
|
||||||
|
) -> RecipeJobStatus:
|
||||||
|
"""Poll the status of an async recipe generation job.
|
||||||
|
|
||||||
|
Returns 404 when job_id is unknown or belongs to a different user.
|
||||||
|
On status='done' with suggestions=[], the LLM returned empty — client
|
||||||
|
should show a 'no recipe generated, try again' message.
|
||||||
|
"""
|
||||||
|
def _get(db_path: Path) -> dict | None:
|
||||||
|
store = Store(db_path)
|
||||||
|
try:
|
||||||
|
return store.get_recipe_job(job_id, session.user_id)
|
||||||
|
finally:
|
||||||
|
store.close()
|
||||||
|
|
||||||
|
row = await asyncio.to_thread(_get, session.db)
|
||||||
|
if row is None:
|
||||||
|
raise HTTPException(status_code=404, detail="Job not found.")
|
||||||
|
|
||||||
|
result = None
|
||||||
|
if row["status"] == "done" and row["result"]:
|
||||||
|
result = RecipeResult.model_validate_json(row["result"])
|
||||||
|
|
||||||
|
return RecipeJobStatus(
|
||||||
|
job_id=row["job_id"],
|
||||||
|
status=row["status"],
|
||||||
|
result=result,
|
||||||
|
error=row["error"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/browse/domains")
|
@router.get("/browse/domains")
|
||||||
async def list_browse_domains(
|
async def list_browse_domains(
|
||||||
session: CloudUser = Depends(get_session),
|
session: CloudUser = Depends(get_session),
|
||||||
|
|
|
||||||
|
|
@ -66,6 +66,13 @@ class Settings:
|
||||||
|
|
||||||
# Feature flags
|
# Feature flags
|
||||||
ENABLE_OCR: bool = os.environ.get("ENABLE_OCR", "false").lower() in ("1", "true", "yes")
|
ENABLE_OCR: bool = os.environ.get("ENABLE_OCR", "false").lower() in ("1", "true", "yes")
|
||||||
|
# Use OrchestratedScheduler (coordinator-aware, multi-GPU fan-out) instead of
|
||||||
|
# LocalScheduler. Defaults to true in CLOUD_MODE; can be set independently
|
||||||
|
# for multi-GPU local rigs that don't need full cloud auth.
|
||||||
|
USE_ORCH_SCHEDULER: bool | None = (
|
||||||
|
None if os.environ.get("USE_ORCH_SCHEDULER") is None
|
||||||
|
else os.environ.get("USE_ORCH_SCHEDULER", "").lower() in ("1", "true", "yes")
|
||||||
|
)
|
||||||
|
|
||||||
# Runtime
|
# Runtime
|
||||||
DEBUG: bool = os.environ.get("DEBUG", "false").lower() in ("1", "true", "yes")
|
DEBUG: bool = os.environ.get("DEBUG", "false").lower() in ("1", "true", "yes")
|
||||||
|
|
|
||||||
14
app/db/migrations/034_recipe_jobs.sql
Normal file
14
app/db/migrations/034_recipe_jobs.sql
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
-- Migration 034: async recipe generation job queue
|
||||||
|
CREATE TABLE IF NOT EXISTS recipe_jobs (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
job_id TEXT NOT NULL UNIQUE,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
status TEXT NOT NULL DEFAULT 'queued',
|
||||||
|
request TEXT NOT NULL,
|
||||||
|
result TEXT,
|
||||||
|
error TEXT,
|
||||||
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||||
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_recipe_jobs_job_id ON recipe_jobs (job_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_recipe_jobs_user_id ON recipe_jobs (user_id, created_at DESC);
|
||||||
|
|
@ -736,6 +736,41 @@ class Store:
|
||||||
row = self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
|
row = self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
|
||||||
return row
|
return row
|
||||||
|
|
||||||
|
# --- Async recipe jobs ---
|
||||||
|
|
||||||
|
def create_recipe_job(self, job_id: str, user_id: str, request_json: str) -> sqlite3.Row:
|
||||||
|
return self._insert_returning(
|
||||||
|
"INSERT INTO recipe_jobs (job_id, user_id, status, request) VALUES (?,?,?,?) RETURNING *",
|
||||||
|
(job_id, user_id, "queued", request_json),
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_recipe_job(self, job_id: str, user_id: str) -> sqlite3.Row | None:
|
||||||
|
return self._fetch_one(
|
||||||
|
"SELECT * FROM recipe_jobs WHERE job_id=? AND user_id=?",
|
||||||
|
(job_id, user_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
def update_recipe_job_running(self, job_id: str) -> None:
|
||||||
|
self.conn.execute(
|
||||||
|
"UPDATE recipe_jobs SET status='running', updated_at=datetime('now') WHERE job_id=?",
|
||||||
|
(job_id,),
|
||||||
|
)
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
def complete_recipe_job(self, job_id: str, result_json: str) -> None:
|
||||||
|
self.conn.execute(
|
||||||
|
"UPDATE recipe_jobs SET status='done', result=?, updated_at=datetime('now') WHERE job_id=?",
|
||||||
|
(result_json, job_id),
|
||||||
|
)
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
def fail_recipe_job(self, job_id: str, error: str) -> None:
|
||||||
|
self.conn.execute(
|
||||||
|
"UPDATE recipe_jobs SET status='failed', error=?, updated_at=datetime('now') WHERE job_id=?",
|
||||||
|
(error, job_id),
|
||||||
|
)
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
def upsert_built_recipe(
|
def upsert_built_recipe(
|
||||||
self,
|
self,
|
||||||
external_id: str,
|
external_id: str,
|
||||||
|
|
|
||||||
|
|
@ -61,6 +61,18 @@ class RecipeResult(BaseModel):
|
||||||
orch_fallback: bool = False # True when orch budget exhausted; fell back to local LLM
|
orch_fallback: bool = False # True when orch budget exhausted; fell back to local LLM
|
||||||
|
|
||||||
|
|
||||||
|
class RecipeJobQueued(BaseModel):
|
||||||
|
job_id: str
|
||||||
|
status: str = "queued"
|
||||||
|
|
||||||
|
|
||||||
|
class RecipeJobStatus(BaseModel):
|
||||||
|
job_id: str
|
||||||
|
status: str
|
||||||
|
result: RecipeResult | None = None
|
||||||
|
error: str | None = None
|
||||||
|
|
||||||
|
|
||||||
class NutritionFilters(BaseModel):
|
class NutritionFilters(BaseModel):
|
||||||
"""Optional per-serving upper bounds for macro filtering. None = no filter."""
|
"""Optional per-serving upper bounds for macro filtering. None = no filter."""
|
||||||
max_calories: float | None = None
|
max_calories: float | None = None
|
||||||
|
|
|
||||||
|
|
@ -181,6 +181,19 @@ class LLMRecipeGenerator:
|
||||||
try:
|
try:
|
||||||
alloc = ctx.__enter__()
|
alloc = ctx.__enter__()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
msg = str(exc)
|
||||||
|
# 429 = coordinator at capacity (all nodes at max_concurrent limit).
|
||||||
|
# Don't fall back to LLMRouter — it's also overloaded and the slow
|
||||||
|
# fallback causes nginx 504s. Return "" fast so the caller degrades
|
||||||
|
# gracefully (empty recipe result) rather than timing out.
|
||||||
|
if "429" in msg or "max_concurrent" in msg.lower():
|
||||||
|
logger.info("cf-orch at capacity — returning empty result (graceful degradation)")
|
||||||
|
if ctx is not None:
|
||||||
|
try:
|
||||||
|
ctx.__exit__(None, None, None)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return ""
|
||||||
logger.debug("cf-orch allocation failed, falling back to LLMRouter: %s", exc)
|
logger.debug("cf-orch allocation failed, falling back to LLMRouter: %s", exc)
|
||||||
ctx = None # __enter__ raised — do not call __exit__
|
ctx = None # __enter__ raised — do not call __exit__
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ from app.services.expiration_predictor import ExpirationPredictor
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback"})
|
LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback", "recipe_llm"})
|
||||||
|
|
||||||
VRAM_BUDGETS: dict[str, float] = {
|
VRAM_BUDGETS: dict[str, float] = {
|
||||||
# ExpirationPredictor uses a small LLM (16 tokens out, single pass).
|
# ExpirationPredictor uses a small LLM (16 tokens out, single pass).
|
||||||
|
|
@ -88,6 +88,8 @@ def run_task(
|
||||||
try:
|
try:
|
||||||
if task_type == "expiry_llm_fallback":
|
if task_type == "expiry_llm_fallback":
|
||||||
_run_expiry_llm_fallback(db_path, job_id, params)
|
_run_expiry_llm_fallback(db_path, job_id, params)
|
||||||
|
elif task_type == "recipe_llm":
|
||||||
|
_run_recipe_llm(db_path, job_id, params)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown kiwi task type: {task_type!r}")
|
raise ValueError(f"Unknown kiwi task type: {task_type!r}")
|
||||||
_update_task_status(db_path, task_id, "completed")
|
_update_task_status(db_path, task_id, "completed")
|
||||||
|
|
@ -143,3 +145,41 @@ def _run_expiry_llm_fallback(
|
||||||
expiry,
|
expiry,
|
||||||
days,
|
days,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _run_recipe_llm(db_path: Path, _job_id_int: int, params: str | None) -> None:
|
||||||
|
"""Run LLM recipe generation for an async recipe job.
|
||||||
|
|
||||||
|
params JSON keys:
|
||||||
|
job_id (required) — recipe_jobs.job_id string (e.g. "rec_a1b2c3...")
|
||||||
|
|
||||||
|
Creates its own Store — follows same pattern as _suggest_in_thread.
|
||||||
|
MUST call store.fail_recipe_job() before re-raising so recipe_jobs.status
|
||||||
|
doesn't stay 'running' while background_tasks shows 'failed'.
|
||||||
|
"""
|
||||||
|
from app.db.store import Store
|
||||||
|
from app.models.schemas.recipe import RecipeRequest
|
||||||
|
from app.services.recipe.recipe_engine import RecipeEngine
|
||||||
|
|
||||||
|
p = json.loads(params or "{}")
|
||||||
|
recipe_job_id: str = p.get("job_id", "")
|
||||||
|
if not recipe_job_id:
|
||||||
|
raise ValueError("recipe_llm: 'job_id' is required in params")
|
||||||
|
|
||||||
|
store = Store(db_path)
|
||||||
|
try:
|
||||||
|
store.update_recipe_job_running(recipe_job_id)
|
||||||
|
row = store._fetch_one(
|
||||||
|
"SELECT request FROM recipe_jobs WHERE job_id=?", (recipe_job_id,)
|
||||||
|
)
|
||||||
|
if row is None:
|
||||||
|
raise ValueError(f"recipe_llm: recipe_jobs row not found: {recipe_job_id!r}")
|
||||||
|
req = RecipeRequest.model_validate_json(row["request"])
|
||||||
|
result = RecipeEngine(store).suggest(req)
|
||||||
|
store.complete_recipe_job(recipe_job_id, result.model_dump_json())
|
||||||
|
log.info("recipe_llm: job %s completed (%d suggestion(s))", recipe_job_id, len(result.suggestions))
|
||||||
|
except Exception as exc:
|
||||||
|
store.fail_recipe_job(recipe_job_id, str(exc))
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
store.close()
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,10 @@
|
||||||
# app/tasks/scheduler.py
|
# app/tasks/scheduler.py
|
||||||
"""Kiwi LLM task scheduler — thin shim over circuitforge_core.tasks.scheduler."""
|
"""Kiwi LLM task scheduler — thin shim over circuitforge_core.tasks.scheduler.
|
||||||
|
|
||||||
|
Local mode (CLOUD_MODE unset): LocalScheduler — simple FIFO, no coordinator.
|
||||||
|
Cloud mode (CLOUD_MODE=true): OrchestratedScheduler — coordinator-aware, fans
|
||||||
|
out concurrent jobs across all registered cf-orch GPU nodes.
|
||||||
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
@ -7,15 +12,68 @@ from pathlib import Path
|
||||||
from circuitforge_core.tasks.scheduler import (
|
from circuitforge_core.tasks.scheduler import (
|
||||||
TaskScheduler,
|
TaskScheduler,
|
||||||
get_scheduler as _base_get_scheduler,
|
get_scheduler as _base_get_scheduler,
|
||||||
reset_scheduler, # re-export for tests
|
reset_scheduler as _reset_local, # re-export for tests
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from app.cloud_session import CLOUD_MODE
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.tasks.runner import LLM_TASK_TYPES, VRAM_BUDGETS, run_task
|
from app.tasks.runner import LLM_TASK_TYPES, VRAM_BUDGETS, run_task
|
||||||
|
|
||||||
|
|
||||||
|
def _orch_available() -> bool:
|
||||||
|
"""Return True if circuitforge_orch is installed in this environment."""
|
||||||
|
try:
|
||||||
|
import circuitforge_orch # noqa: F401
|
||||||
|
return True
|
||||||
|
except ImportError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _use_orch() -> bool:
|
||||||
|
"""Return True if the OrchestratedScheduler should be used.
|
||||||
|
|
||||||
|
Priority order:
|
||||||
|
1. USE_ORCH_SCHEDULER env var — explicit override always wins.
|
||||||
|
2. CLOUD_MODE=true — use orch in managed cloud deployments.
|
||||||
|
3. circuitforge_orch installed — paid+ local users who have cf-orch
|
||||||
|
set up get coordinator-aware scheduling (local GPU first) automatically.
|
||||||
|
"""
|
||||||
|
override = settings.USE_ORCH_SCHEDULER
|
||||||
|
if override is not None:
|
||||||
|
return override
|
||||||
|
return CLOUD_MODE or _orch_available()
|
||||||
|
|
||||||
|
|
||||||
def get_scheduler(db_path: Path) -> TaskScheduler:
|
def get_scheduler(db_path: Path) -> TaskScheduler:
|
||||||
"""Return the process-level TaskScheduler singleton for Kiwi."""
|
"""Return the process-level TaskScheduler singleton for Kiwi.
|
||||||
|
|
||||||
|
OrchestratedScheduler: coordinator-aware, fans out concurrent jobs across
|
||||||
|
all registered cf-orch GPU nodes. Active when USE_ORCH_SCHEDULER=true,
|
||||||
|
CLOUD_MODE=true, or circuitforge_orch is installed locally (paid+ users
|
||||||
|
running their own cf-orch stack get this automatically; local GPU is
|
||||||
|
preferred by the coordinator's allocation queue).
|
||||||
|
|
||||||
|
LocalScheduler: serial FIFO, no coordinator dependency. Free-tier local
|
||||||
|
installs without circuitforge_orch installed use this automatically.
|
||||||
|
"""
|
||||||
|
if _use_orch():
|
||||||
|
try:
|
||||||
|
from circuitforge_orch.scheduler import get_orch_scheduler
|
||||||
|
except ImportError:
|
||||||
|
import logging
|
||||||
|
logging.getLogger(__name__).warning(
|
||||||
|
"circuitforge_orch not installed — falling back to LocalScheduler"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return get_orch_scheduler(
|
||||||
|
db_path=db_path,
|
||||||
|
run_task_fn=run_task,
|
||||||
|
task_types=LLM_TASK_TYPES,
|
||||||
|
vram_budgets=VRAM_BUDGETS,
|
||||||
|
coordinator_url=settings.COORDINATOR_URL,
|
||||||
|
service_name="kiwi",
|
||||||
|
)
|
||||||
|
|
||||||
return _base_get_scheduler(
|
return _base_get_scheduler(
|
||||||
db_path=db_path,
|
db_path=db_path,
|
||||||
run_task_fn=run_task,
|
run_task_fn=run_task,
|
||||||
|
|
@ -24,3 +82,15 @@ def get_scheduler(db_path: Path) -> TaskScheduler:
|
||||||
coordinator_url=settings.COORDINATOR_URL,
|
coordinator_url=settings.COORDINATOR_URL,
|
||||||
service_name="kiwi",
|
service_name="kiwi",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def reset_scheduler() -> None:
|
||||||
|
"""Shut down and clear the active scheduler singleton. TEST TEARDOWN ONLY."""
|
||||||
|
if _use_orch():
|
||||||
|
try:
|
||||||
|
from circuitforge_orch.scheduler import reset_orch_scheduler
|
||||||
|
reset_orch_scheduler()
|
||||||
|
return
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
_reset_local()
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,9 @@ services:
|
||||||
# Docker can follow the symlink inside the container.
|
# Docker can follow the symlink inside the container.
|
||||||
- /Library/Assets/kiwi:/Library/Assets/kiwi:rw
|
- /Library/Assets/kiwi:/Library/Assets/kiwi:rw
|
||||||
|
|
||||||
# cf-orch agent sidecar: registers kiwi as a GPU node with the coordinator.
|
# cf-orch agent sidecar: registers this machine as GPU node "sif" with the coordinator.
|
||||||
# The API scheduler uses COORDINATOR_URL to lease VRAM cooperatively; this
|
# The API scheduler uses COORDINATOR_URL to lease VRAM cooperatively; this
|
||||||
# agent makes kiwi's VRAM usage visible on the orchestrator dashboard.
|
# agent makes the local VRAM usage visible on the orchestrator dashboard.
|
||||||
cf-orch-agent:
|
cf-orch-agent:
|
||||||
image: kiwi-api # reuse local api image — cf-core already installed there
|
image: kiwi-api # reuse local api image — cf-core already installed there
|
||||||
network_mode: host
|
network_mode: host
|
||||||
|
|
@ -21,7 +21,7 @@ services:
|
||||||
command: >
|
command: >
|
||||||
conda run -n kiwi cf-orch agent
|
conda run -n kiwi cf-orch agent
|
||||||
--coordinator ${COORDINATOR_URL:-http://10.1.10.71:7700}
|
--coordinator ${COORDINATOR_URL:-http://10.1.10.71:7700}
|
||||||
--node-id kiwi
|
--node-id sif
|
||||||
--host 0.0.0.0
|
--host 0.0.0.0
|
||||||
--port 7702
|
--port 7702
|
||||||
--advertise-host ${CF_ORCH_ADVERTISE_HOST:-10.1.10.71}
|
--advertise-host ${CF_ORCH_ADVERTISE_HOST:-10.1.10.71}
|
||||||
|
|
|
||||||
74
config/llm.yaml.example
Normal file
74
config/llm.yaml.example
Normal file
|
|
@ -0,0 +1,74 @@
|
||||||
|
# Kiwi — LLM backend configuration
|
||||||
|
#
|
||||||
|
# Copy to ~/.config/circuitforge/llm.yaml (shared across all CF products)
|
||||||
|
# or to config/llm.yaml (Kiwi-local, takes precedence).
|
||||||
|
#
|
||||||
|
# Kiwi uses LLMs for:
|
||||||
|
# - Expiry prediction fallback (unknown products not in the lookup table)
|
||||||
|
# - Meal planning suggestions
|
||||||
|
#
|
||||||
|
# Local inference (Ollama / vLLM) is the default path — no API key required.
|
||||||
|
# BYOK (bring your own key): set api_key_env to point at your API key env var.
|
||||||
|
# cf-orch trunk: set CF_ORCH_URL env var to allocate cf-text on-demand via
|
||||||
|
# the coordinator instead of hitting a static URL.
|
||||||
|
|
||||||
|
backends:
|
||||||
|
ollama:
|
||||||
|
type: openai_compat
|
||||||
|
enabled: true
|
||||||
|
base_url: http://localhost:11434/v1
|
||||||
|
model: llama3.2:3b
|
||||||
|
api_key: ollama
|
||||||
|
supports_images: false
|
||||||
|
|
||||||
|
vllm:
|
||||||
|
type: openai_compat
|
||||||
|
enabled: false
|
||||||
|
base_url: http://localhost:8000/v1
|
||||||
|
model: __auto__ # resolved from /v1/models at runtime
|
||||||
|
api_key: ''
|
||||||
|
supports_images: false
|
||||||
|
|
||||||
|
# ── cf-orch trunk services ──────────────────────────────────────────────────
|
||||||
|
# These allocate via cf-orch rather than connecting to a static URL.
|
||||||
|
# cf-orch starts the service on-demand and returns its live URL.
|
||||||
|
# Set CF_ORCH_URL env var or fill in url below; leave enabled: false if
|
||||||
|
# cf-orch is not deployed in your environment.
|
||||||
|
|
||||||
|
cf_text:
|
||||||
|
type: openai_compat
|
||||||
|
enabled: false
|
||||||
|
base_url: http://localhost:8008/v1 # fallback when cf-orch is not available
|
||||||
|
model: __auto__
|
||||||
|
api_key: any
|
||||||
|
supports_images: false
|
||||||
|
cf_orch:
|
||||||
|
service: cf-text
|
||||||
|
# model_candidates: leave empty to use the service's default_model,
|
||||||
|
# or specify a catalog alias (e.g. "qwen2.5-3b").
|
||||||
|
model_candidates: []
|
||||||
|
ttl_s: 3600
|
||||||
|
|
||||||
|
# ── Cloud / BYOK ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
anthropic:
|
||||||
|
type: anthropic
|
||||||
|
enabled: false
|
||||||
|
model: claude-haiku-4-5-20251001
|
||||||
|
api_key_env: ANTHROPIC_API_KEY
|
||||||
|
supports_images: false
|
||||||
|
|
||||||
|
openai:
|
||||||
|
type: openai_compat
|
||||||
|
enabled: false
|
||||||
|
base_url: https://api.openai.com/v1
|
||||||
|
model: gpt-4o-mini
|
||||||
|
api_key_env: OPENAI_API_KEY
|
||||||
|
supports_images: false
|
||||||
|
|
||||||
|
fallback_order:
|
||||||
|
- cf_text
|
||||||
|
- ollama
|
||||||
|
- vllm
|
||||||
|
- anthropic
|
||||||
|
- openai
|
||||||
|
|
@ -287,7 +287,10 @@
|
||||||
@click="handleSuggest"
|
@click="handleSuggest"
|
||||||
>
|
>
|
||||||
<span v-if="recipesStore.loading && !isLoadingMore">
|
<span v-if="recipesStore.loading && !isLoadingMore">
|
||||||
<span class="spinner spinner-sm inline-spinner"></span> Finding recipes…
|
<span class="spinner spinner-sm inline-spinner"></span>
|
||||||
|
<span v-if="recipesStore.jobStatus === 'queued'">Queued…</span>
|
||||||
|
<span v-else-if="recipesStore.jobStatus === 'running'">Generating…</span>
|
||||||
|
<span v-else>Finding recipes…</span>
|
||||||
</span>
|
</span>
|
||||||
<span v-else>Suggest Recipes</span>
|
<span v-else>Suggest Recipes</span>
|
||||||
</button>
|
</button>
|
||||||
|
|
@ -312,7 +315,9 @@
|
||||||
|
|
||||||
<!-- Screen reader announcement for loading + results -->
|
<!-- Screen reader announcement for loading + results -->
|
||||||
<div aria-live="polite" aria-atomic="true" class="sr-only">
|
<div aria-live="polite" aria-atomic="true" class="sr-only">
|
||||||
<span v-if="recipesStore.loading">Finding recipes…</span>
|
<span v-if="recipesStore.loading && recipesStore.jobStatus === 'queued'">Recipe request queued, waiting for model…</span>
|
||||||
|
<span v-else-if="recipesStore.loading && recipesStore.jobStatus === 'running'">Generating your recipe now…</span>
|
||||||
|
<span v-else-if="recipesStore.loading">Finding recipes…</span>
|
||||||
<span v-else-if="recipesStore.result">
|
<span v-else-if="recipesStore.result">
|
||||||
{{ filteredSuggestions.length }} recipe{{ filteredSuggestions.length !== 1 ? 's' : '' }} found
|
{{ filteredSuggestions.length }} recipe{{ filteredSuggestions.length !== 1 ? 's' : '' }} found
|
||||||
</span>
|
</span>
|
||||||
|
|
|
||||||
|
|
@ -524,6 +524,15 @@ export interface RecipeResult {
|
||||||
rate_limit_count: number
|
rate_limit_count: number
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type RecipeJobStatusValue = 'queued' | 'running' | 'done' | 'failed'
|
||||||
|
|
||||||
|
export interface RecipeJobStatus {
|
||||||
|
job_id: string
|
||||||
|
status: RecipeJobStatusValue
|
||||||
|
result: RecipeResult | null
|
||||||
|
error: string | null
|
||||||
|
}
|
||||||
|
|
||||||
export interface RecipeRequest {
|
export interface RecipeRequest {
|
||||||
pantry_items: string[]
|
pantry_items: string[]
|
||||||
secondary_pantry_items: Record<string, string>
|
secondary_pantry_items: Record<string, string>
|
||||||
|
|
@ -593,6 +602,18 @@ export const recipesAPI = {
|
||||||
const response = await api.post('/recipes/suggest', req, { timeout: 120000 })
|
const response = await api.post('/recipes/suggest', req, { timeout: 120000 })
|
||||||
return response.data
|
return response.data
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/** Submit an async job for L3/L4 generation. Returns job_id + initial status. */
|
||||||
|
async suggestAsync(req: RecipeRequest): Promise<{ job_id: string; status: string }> {
|
||||||
|
const response = await api.post('/recipes/suggest', req, { params: { async: 'true' }, timeout: 15000 })
|
||||||
|
return response.data
|
||||||
|
},
|
||||||
|
|
||||||
|
/** Poll an async job. Returns the full status including result once done. */
|
||||||
|
async pollJob(jobId: string): Promise<RecipeJobStatus> {
|
||||||
|
const response = await api.get(`/recipes/jobs/${jobId}`, { timeout: 10000 })
|
||||||
|
return response.data
|
||||||
|
},
|
||||||
async getRecipe(id: number): Promise<RecipeSuggestion> {
|
async getRecipe(id: number): Promise<RecipeSuggestion> {
|
||||||
const response = await api.get(`/recipes/${id}`)
|
const response = await api.get(`/recipes/${id}`)
|
||||||
return response.data
|
return response.data
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@
|
||||||
|
|
||||||
import { defineStore } from 'pinia'
|
import { defineStore } from 'pinia'
|
||||||
import { ref, computed, watch } from 'vue'
|
import { ref, computed, watch } from 'vue'
|
||||||
import { recipesAPI, type RecipeResult, type RecipeSuggestion, type RecipeRequest, type NutritionFilters } from '../services/api'
|
import { recipesAPI, type RecipeResult, type RecipeSuggestion, type RecipeRequest, type RecipeJobStatusValue, type NutritionFilters } from '../services/api'
|
||||||
|
|
||||||
const DISMISSED_KEY = 'kiwi:dismissed_recipes'
|
const DISMISSED_KEY = 'kiwi:dismissed_recipes'
|
||||||
const DISMISS_TTL_MS = 7 * 24 * 60 * 60 * 1000
|
const DISMISS_TTL_MS = 7 * 24 * 60 * 60 * 1000
|
||||||
|
|
@ -121,6 +121,7 @@ export const useRecipesStore = defineStore('recipes', () => {
|
||||||
const result = ref<RecipeResult | null>(null)
|
const result = ref<RecipeResult | null>(null)
|
||||||
const loading = ref(false)
|
const loading = ref(false)
|
||||||
const error = ref<string | null>(null)
|
const error = ref<string | null>(null)
|
||||||
|
const jobStatus = ref<RecipeJobStatusValue | null>(null)
|
||||||
|
|
||||||
// Request parameters
|
// Request parameters
|
||||||
const level = ref(1)
|
const level = ref(1)
|
||||||
|
|
@ -199,18 +200,57 @@ export const useRecipesStore = defineStore('recipes', () => {
|
||||||
async function suggest(pantryItems: string[], secondaryPantryItems: Record<string, string> = {}) {
|
async function suggest(pantryItems: string[], secondaryPantryItems: Record<string, string> = {}) {
|
||||||
loading.value = true
|
loading.value = true
|
||||||
error.value = null
|
error.value = null
|
||||||
|
jobStatus.value = null
|
||||||
seenIds.value = new Set()
|
seenIds.value = new Set()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if (level.value >= 3) {
|
||||||
|
await _suggestAsync(pantryItems, secondaryPantryItems)
|
||||||
|
} else {
|
||||||
result.value = await recipesAPI.suggest(_buildRequest(pantryItems, secondaryPantryItems))
|
result.value = await recipesAPI.suggest(_buildRequest(pantryItems, secondaryPantryItems))
|
||||||
_trackSeen(result.value.suggestions)
|
_trackSeen(result.value.suggestions)
|
||||||
|
}
|
||||||
} catch (err: unknown) {
|
} catch (err: unknown) {
|
||||||
error.value = err instanceof Error ? err.message : 'Failed to get recipe suggestions'
|
error.value = err instanceof Error ? err.message : 'Failed to get recipe suggestions'
|
||||||
} finally {
|
} finally {
|
||||||
loading.value = false
|
loading.value = false
|
||||||
|
jobStatus.value = null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function _suggestAsync(pantryItems: string[], secondaryPantryItems: Record<string, string>) {
|
||||||
|
const queued = await recipesAPI.suggestAsync(_buildRequest(pantryItems, secondaryPantryItems))
|
||||||
|
|
||||||
|
// CLOUD_MODE or future sync fallback: server returned result directly (status 200)
|
||||||
|
if ('suggestions' in queued) {
|
||||||
|
result.value = queued as unknown as RecipeResult
|
||||||
|
_trackSeen(result.value.suggestions)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
jobStatus.value = 'queued'
|
||||||
|
const { job_id } = queued
|
||||||
|
const deadline = Date.now() + 90_000
|
||||||
|
const POLL_MS = 2_500
|
||||||
|
|
||||||
|
while (Date.now() < deadline) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, POLL_MS))
|
||||||
|
const poll = await recipesAPI.pollJob(job_id)
|
||||||
|
jobStatus.value = poll.status
|
||||||
|
|
||||||
|
if (poll.status === 'done') {
|
||||||
|
result.value = poll.result
|
||||||
|
if (result.value) _trackSeen(result.value.suggestions)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (poll.status === 'failed') {
|
||||||
|
throw new Error(poll.error ?? 'Recipe generation failed')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error('Recipe generation timed out — the model may be busy. Try again.')
|
||||||
|
}
|
||||||
|
|
||||||
async function loadMore(pantryItems: string[], secondaryPantryItems: Record<string, string> = {}) {
|
async function loadMore(pantryItems: string[], secondaryPantryItems: Record<string, string> = {}) {
|
||||||
if (!result.value || loading.value) return
|
if (!result.value || loading.value) return
|
||||||
loading.value = true
|
loading.value = true
|
||||||
|
|
@ -308,6 +348,7 @@ export const useRecipesStore = defineStore('recipes', () => {
|
||||||
result,
|
result,
|
||||||
loading,
|
loading,
|
||||||
error,
|
error,
|
||||||
|
jobStatus,
|
||||||
level,
|
level,
|
||||||
constraints,
|
constraints,
|
||||||
allergies,
|
allergies,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue