feat(recipes): async L3/L4 recipe job queue with poll endpoint

Adds the recipe_jobs table and background task pipeline for level 3/4
recipe generation. POST ?async=true returns 202 with job_id; clients
poll GET /recipes/jobs/{job_id} until status=done.

Key fix: _enqueue_recipe_job now calls scheduler.enqueue() after
insert_task() to wake the in-memory work queue immediately. Without
this, tasks sat in 'queued' until the scheduler's 30s idle cycle or
an API restart triggered _load_queued_tasks().

- Migration 034: recipe_jobs table (job_id, user_id, status, request,
  result, error) with indexes on job_id and user_id/created_at
- Store: create/get/update_running/complete/fail recipe job methods
- runner.py: recipe_llm task type + _run_recipe_llm handler; MUST
  call fail_recipe_job() before re-raising so status stays consistent
- CLOUD_MODE guard: falls back to sync generation (scheduler only
  polls shared settings DB, not per-user DBs)
- L4 wildcard is covered by the same req.level in (3, 4) dispatch
This commit is contained in:
pyr0ball 2026-04-19 21:44:27 -07:00
parent eba536070c
commit ed4595d960
5 changed files with 180 additions and 3 deletions

View file

@ -16,6 +16,7 @@ from app.db.store import Store
from app.models.schemas.recipe import ( from app.models.schemas.recipe import (
AssemblyTemplateOut, AssemblyTemplateOut,
BuildRequest, BuildRequest,
RecipeJobStatus,
RecipeRequest, RecipeRequest,
RecipeResult, RecipeResult,
RecipeSuggestion, RecipeSuggestion,
@ -57,12 +58,50 @@ def _suggest_in_thread(db_path: Path, req: RecipeRequest) -> RecipeResult:
store.close() store.close()
@router.post("/suggest", response_model=RecipeResult) async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest):
"""Queue an async recipe_llm job and return 202 with job_id.
Falls back to synchronous generation in CLOUD_MODE (scheduler polls only
the shared settings DB, not per-user DBs see snipe#45 / kiwi backlog).
"""
import json
import uuid
from fastapi.responses import JSONResponse
from app.cloud_session import CLOUD_MODE
from app.tasks.runner import insert_task
if CLOUD_MODE:
log.warning("recipe_llm async jobs not supported in CLOUD_MODE — falling back to sync")
result = await asyncio.to_thread(_suggest_in_thread, session.db, req)
return result
job_id = f"rec_{uuid.uuid4().hex}"
def _create(db_path: Path) -> int:
store = Store(db_path)
try:
row = store.create_recipe_job(job_id, session.user_id, req.model_dump_json())
return row["id"]
finally:
store.close()
int_id = await asyncio.to_thread(_create, session.db)
params_json = json.dumps({"job_id": job_id})
task_id, is_new = insert_task(session.db, "recipe_llm", int_id, params=params_json)
if is_new:
from app.tasks.scheduler import get_scheduler
get_scheduler(session.db).enqueue(task_id, "recipe_llm", int_id, params_json)
return JSONResponse(content={"job_id": job_id, "status": "queued"}, status_code=202)
@router.post("/suggest")
async def suggest_recipes( async def suggest_recipes(
req: RecipeRequest, req: RecipeRequest,
async_mode: bool = Query(default=False, alias="async"),
session: CloudUser = Depends(get_session), session: CloudUser = Depends(get_session),
store: Store = Depends(get_store), store: Store = Depends(get_store),
) -> RecipeResult: ):
log.info("recipes auth=%s tier=%s level=%s", _auth_label(session.user_id), session.tier, req.level) log.info("recipes auth=%s tier=%s level=%s", _auth_label(session.user_id), session.tier, req.level)
# Inject session-authoritative tier/byok immediately — client-supplied values are ignored. # Inject session-authoritative tier/byok immediately — client-supplied values are ignored.
# Also read stored unit_system preference; default to metric if not set. # Also read stored unit_system preference; default to metric if not set.
@ -95,12 +134,49 @@ async def suggest_recipes(
req = req.model_copy(update={"level": 2}) req = req.model_copy(update={"level": 2})
orch_fallback = True orch_fallback = True
if req.level in (3, 4) and async_mode:
return await _enqueue_recipe_job(session, req)
result = await asyncio.to_thread(_suggest_in_thread, session.db, req) result = await asyncio.to_thread(_suggest_in_thread, session.db, req)
if orch_fallback: if orch_fallback:
result = result.model_copy(update={"orch_fallback": True}) result = result.model_copy(update={"orch_fallback": True})
return result return result
@router.get("/jobs/{job_id}", response_model=RecipeJobStatus)
async def get_recipe_job_status(
job_id: str,
session: CloudUser = Depends(get_session),
) -> RecipeJobStatus:
"""Poll the status of an async recipe generation job.
Returns 404 when job_id is unknown or belongs to a different user.
On status='done' with suggestions=[], the LLM returned empty client
should show a 'no recipe generated, try again' message.
"""
def _get(db_path: Path) -> dict | None:
store = Store(db_path)
try:
return store.get_recipe_job(job_id, session.user_id)
finally:
store.close()
row = await asyncio.to_thread(_get, session.db)
if row is None:
raise HTTPException(status_code=404, detail="Job not found.")
result = None
if row["status"] == "done" and row["result"]:
result = RecipeResult.model_validate_json(row["result"])
return RecipeJobStatus(
job_id=row["job_id"],
status=row["status"],
result=result,
error=row["error"],
)
@router.get("/browse/domains") @router.get("/browse/domains")
async def list_browse_domains( async def list_browse_domains(
session: CloudUser = Depends(get_session), session: CloudUser = Depends(get_session),

View file

@ -0,0 +1,14 @@
-- Migration 034: async recipe generation job queue
CREATE TABLE IF NOT EXISTS recipe_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL UNIQUE,
user_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
request TEXT NOT NULL,
result TEXT,
error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_recipe_jobs_job_id ON recipe_jobs (job_id);
CREATE INDEX IF NOT EXISTS idx_recipe_jobs_user_id ON recipe_jobs (user_id, created_at DESC);

View file

@ -736,6 +736,41 @@ class Store:
row = self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,)) row = self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
return row return row
# --- Async recipe jobs ---
def create_recipe_job(self, job_id: str, user_id: str, request_json: str) -> sqlite3.Row:
return self._insert_returning(
"INSERT INTO recipe_jobs (job_id, user_id, status, request) VALUES (?,?,?,?) RETURNING *",
(job_id, user_id, "queued", request_json),
)
def get_recipe_job(self, job_id: str, user_id: str) -> sqlite3.Row | None:
return self._fetch_one(
"SELECT * FROM recipe_jobs WHERE job_id=? AND user_id=?",
(job_id, user_id),
)
def update_recipe_job_running(self, job_id: str) -> None:
self.conn.execute(
"UPDATE recipe_jobs SET status='running', updated_at=datetime('now') WHERE job_id=?",
(job_id,),
)
self.conn.commit()
def complete_recipe_job(self, job_id: str, result_json: str) -> None:
self.conn.execute(
"UPDATE recipe_jobs SET status='done', result=?, updated_at=datetime('now') WHERE job_id=?",
(result_json, job_id),
)
self.conn.commit()
def fail_recipe_job(self, job_id: str, error: str) -> None:
self.conn.execute(
"UPDATE recipe_jobs SET status='failed', error=?, updated_at=datetime('now') WHERE job_id=?",
(error, job_id),
)
self.conn.commit()
def upsert_built_recipe( def upsert_built_recipe(
self, self,
external_id: str, external_id: str,

View file

@ -61,6 +61,18 @@ class RecipeResult(BaseModel):
orch_fallback: bool = False # True when orch budget exhausted; fell back to local LLM orch_fallback: bool = False # True when orch budget exhausted; fell back to local LLM
class RecipeJobQueued(BaseModel):
job_id: str
status: str = "queued"
class RecipeJobStatus(BaseModel):
job_id: str
status: str
result: RecipeResult | None = None
error: str | None = None
class NutritionFilters(BaseModel): class NutritionFilters(BaseModel):
"""Optional per-serving upper bounds for macro filtering. None = no filter.""" """Optional per-serving upper bounds for macro filtering. None = no filter."""
max_calories: float | None = None max_calories: float | None = None

View file

@ -22,7 +22,7 @@ from app.services.expiration_predictor import ExpirationPredictor
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback"}) LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback", "recipe_llm"})
VRAM_BUDGETS: dict[str, float] = { VRAM_BUDGETS: dict[str, float] = {
# ExpirationPredictor uses a small LLM (16 tokens out, single pass). # ExpirationPredictor uses a small LLM (16 tokens out, single pass).
@ -88,6 +88,8 @@ def run_task(
try: try:
if task_type == "expiry_llm_fallback": if task_type == "expiry_llm_fallback":
_run_expiry_llm_fallback(db_path, job_id, params) _run_expiry_llm_fallback(db_path, job_id, params)
elif task_type == "recipe_llm":
_run_recipe_llm(db_path, job_id, params)
else: else:
raise ValueError(f"Unknown kiwi task type: {task_type!r}") raise ValueError(f"Unknown kiwi task type: {task_type!r}")
_update_task_status(db_path, task_id, "completed") _update_task_status(db_path, task_id, "completed")
@ -143,3 +145,41 @@ def _run_expiry_llm_fallback(
expiry, expiry,
days, days,
) )
def _run_recipe_llm(db_path: Path, _job_id_int: int, params: str | None) -> None:
"""Run LLM recipe generation for an async recipe job.
params JSON keys:
job_id (required) recipe_jobs.job_id string (e.g. "rec_a1b2c3...")
Creates its own Store follows same pattern as _suggest_in_thread.
MUST call store.fail_recipe_job() before re-raising so recipe_jobs.status
doesn't stay 'running' while background_tasks shows 'failed'.
"""
from app.db.store import Store
from app.models.schemas.recipe import RecipeRequest
from app.services.recipe.recipe_engine import RecipeEngine
p = json.loads(params or "{}")
recipe_job_id: str = p.get("job_id", "")
if not recipe_job_id:
raise ValueError("recipe_llm: 'job_id' is required in params")
store = Store(db_path)
try:
store.update_recipe_job_running(recipe_job_id)
row = store._fetch_one(
"SELECT request FROM recipe_jobs WHERE job_id=?", (recipe_job_id,)
)
if row is None:
raise ValueError(f"recipe_llm: recipe_jobs row not found: {recipe_job_id!r}")
req = RecipeRequest.model_validate_json(row["request"])
result = RecipeEngine(store).suggest(req)
store.complete_recipe_job(recipe_job_id, result.model_dump_json())
log.info("recipe_llm: job %s completed (%d suggestion(s))", recipe_job_id, len(result.suggestions))
except Exception as exc:
store.fail_recipe_job(recipe_job_id, str(exc))
raise
finally:
store.close()