From ed4595d960b7d0c459e9c16bf6504f9432eee0d4 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Sun, 19 Apr 2026 21:44:27 -0700 Subject: [PATCH] feat(recipes): async L3/L4 recipe job queue with poll endpoint Adds the recipe_jobs table and background task pipeline for level 3/4 recipe generation. POST ?async=true returns 202 with job_id; clients poll GET /recipes/jobs/{job_id} until status=done. Key fix: _enqueue_recipe_job now calls scheduler.enqueue() after insert_task() to wake the in-memory work queue immediately. Without this, tasks sat in 'queued' until the scheduler's 30s idle cycle or an API restart triggered _load_queued_tasks(). - Migration 034: recipe_jobs table (job_id, user_id, status, request, result, error) with indexes on job_id and user_id/created_at - Store: create/get/update_running/complete/fail recipe job methods - runner.py: recipe_llm task type + _run_recipe_llm handler; MUST call fail_recipe_job() before re-raising so status stays consistent - CLOUD_MODE guard: falls back to sync generation (scheduler only polls shared settings DB, not per-user DBs) - L4 wildcard is covered by the same req.level in (3, 4) dispatch --- app/api/endpoints/recipes.py | 80 ++++++++++++++++++++++++++- app/db/migrations/034_recipe_jobs.sql | 14 +++++ app/db/store.py | 35 ++++++++++++ app/models/schemas/recipe.py | 12 ++++ app/tasks/runner.py | 42 +++++++++++++- 5 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 app/db/migrations/034_recipe_jobs.sql diff --git a/app/api/endpoints/recipes.py b/app/api/endpoints/recipes.py index 5900de3..ee65761 100644 --- a/app/api/endpoints/recipes.py +++ b/app/api/endpoints/recipes.py @@ -16,6 +16,7 @@ from app.db.store import Store from app.models.schemas.recipe import ( AssemblyTemplateOut, BuildRequest, + RecipeJobStatus, RecipeRequest, RecipeResult, RecipeSuggestion, @@ -57,12 +58,50 @@ def _suggest_in_thread(db_path: Path, req: RecipeRequest) -> RecipeResult: store.close() -@router.post("/suggest", response_model=RecipeResult) +async def _enqueue_recipe_job(session: CloudUser, req: RecipeRequest): + """Queue an async recipe_llm job and return 202 with job_id. + + Falls back to synchronous generation in CLOUD_MODE (scheduler polls only + the shared settings DB, not per-user DBs — see snipe#45 / kiwi backlog). + """ + import json + import uuid + from fastapi.responses import JSONResponse + from app.cloud_session import CLOUD_MODE + from app.tasks.runner import insert_task + + if CLOUD_MODE: + log.warning("recipe_llm async jobs not supported in CLOUD_MODE — falling back to sync") + result = await asyncio.to_thread(_suggest_in_thread, session.db, req) + return result + + job_id = f"rec_{uuid.uuid4().hex}" + + def _create(db_path: Path) -> int: + store = Store(db_path) + try: + row = store.create_recipe_job(job_id, session.user_id, req.model_dump_json()) + return row["id"] + finally: + store.close() + + int_id = await asyncio.to_thread(_create, session.db) + params_json = json.dumps({"job_id": job_id}) + task_id, is_new = insert_task(session.db, "recipe_llm", int_id, params=params_json) + if is_new: + from app.tasks.scheduler import get_scheduler + get_scheduler(session.db).enqueue(task_id, "recipe_llm", int_id, params_json) + + return JSONResponse(content={"job_id": job_id, "status": "queued"}, status_code=202) + + +@router.post("/suggest") async def suggest_recipes( req: RecipeRequest, + async_mode: bool = Query(default=False, alias="async"), session: CloudUser = Depends(get_session), store: Store = Depends(get_store), -) -> RecipeResult: +): log.info("recipes auth=%s tier=%s level=%s", _auth_label(session.user_id), session.tier, req.level) # Inject session-authoritative tier/byok immediately — client-supplied values are ignored. # Also read stored unit_system preference; default to metric if not set. @@ -95,12 +134,49 @@ async def suggest_recipes( req = req.model_copy(update={"level": 2}) orch_fallback = True + if req.level in (3, 4) and async_mode: + return await _enqueue_recipe_job(session, req) + result = await asyncio.to_thread(_suggest_in_thread, session.db, req) if orch_fallback: result = result.model_copy(update={"orch_fallback": True}) return result +@router.get("/jobs/{job_id}", response_model=RecipeJobStatus) +async def get_recipe_job_status( + job_id: str, + session: CloudUser = Depends(get_session), +) -> RecipeJobStatus: + """Poll the status of an async recipe generation job. + + Returns 404 when job_id is unknown or belongs to a different user. + On status='done' with suggestions=[], the LLM returned empty — client + should show a 'no recipe generated, try again' message. + """ + def _get(db_path: Path) -> dict | None: + store = Store(db_path) + try: + return store.get_recipe_job(job_id, session.user_id) + finally: + store.close() + + row = await asyncio.to_thread(_get, session.db) + if row is None: + raise HTTPException(status_code=404, detail="Job not found.") + + result = None + if row["status"] == "done" and row["result"]: + result = RecipeResult.model_validate_json(row["result"]) + + return RecipeJobStatus( + job_id=row["job_id"], + status=row["status"], + result=result, + error=row["error"], + ) + + @router.get("/browse/domains") async def list_browse_domains( session: CloudUser = Depends(get_session), diff --git a/app/db/migrations/034_recipe_jobs.sql b/app/db/migrations/034_recipe_jobs.sql new file mode 100644 index 0000000..a33cae5 --- /dev/null +++ b/app/db/migrations/034_recipe_jobs.sql @@ -0,0 +1,14 @@ +-- Migration 034: async recipe generation job queue +CREATE TABLE IF NOT EXISTS recipe_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL UNIQUE, + user_id TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'queued', + request TEXT NOT NULL, + result TEXT, + error TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); +CREATE INDEX IF NOT EXISTS idx_recipe_jobs_job_id ON recipe_jobs (job_id); +CREATE INDEX IF NOT EXISTS idx_recipe_jobs_user_id ON recipe_jobs (user_id, created_at DESC); diff --git a/app/db/store.py b/app/db/store.py index a58884b..d4e74fc 100644 --- a/app/db/store.py +++ b/app/db/store.py @@ -736,6 +736,41 @@ class Store: row = self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,)) return row + # --- Async recipe jobs --- + + def create_recipe_job(self, job_id: str, user_id: str, request_json: str) -> sqlite3.Row: + return self._insert_returning( + "INSERT INTO recipe_jobs (job_id, user_id, status, request) VALUES (?,?,?,?) RETURNING *", + (job_id, user_id, "queued", request_json), + ) + + def get_recipe_job(self, job_id: str, user_id: str) -> sqlite3.Row | None: + return self._fetch_one( + "SELECT * FROM recipe_jobs WHERE job_id=? AND user_id=?", + (job_id, user_id), + ) + + def update_recipe_job_running(self, job_id: str) -> None: + self.conn.execute( + "UPDATE recipe_jobs SET status='running', updated_at=datetime('now') WHERE job_id=?", + (job_id,), + ) + self.conn.commit() + + def complete_recipe_job(self, job_id: str, result_json: str) -> None: + self.conn.execute( + "UPDATE recipe_jobs SET status='done', result=?, updated_at=datetime('now') WHERE job_id=?", + (result_json, job_id), + ) + self.conn.commit() + + def fail_recipe_job(self, job_id: str, error: str) -> None: + self.conn.execute( + "UPDATE recipe_jobs SET status='failed', error=?, updated_at=datetime('now') WHERE job_id=?", + (error, job_id), + ) + self.conn.commit() + def upsert_built_recipe( self, external_id: str, diff --git a/app/models/schemas/recipe.py b/app/models/schemas/recipe.py index 64c5298..ebb07e7 100644 --- a/app/models/schemas/recipe.py +++ b/app/models/schemas/recipe.py @@ -61,6 +61,18 @@ class RecipeResult(BaseModel): orch_fallback: bool = False # True when orch budget exhausted; fell back to local LLM +class RecipeJobQueued(BaseModel): + job_id: str + status: str = "queued" + + +class RecipeJobStatus(BaseModel): + job_id: str + status: str + result: RecipeResult | None = None + error: str | None = None + + class NutritionFilters(BaseModel): """Optional per-serving upper bounds for macro filtering. None = no filter.""" max_calories: float | None = None diff --git a/app/tasks/runner.py b/app/tasks/runner.py index f9315c3..f0a4ad2 100644 --- a/app/tasks/runner.py +++ b/app/tasks/runner.py @@ -22,7 +22,7 @@ from app.services.expiration_predictor import ExpirationPredictor log = logging.getLogger(__name__) -LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback"}) +LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback", "recipe_llm"}) VRAM_BUDGETS: dict[str, float] = { # ExpirationPredictor uses a small LLM (16 tokens out, single pass). @@ -88,6 +88,8 @@ def run_task( try: if task_type == "expiry_llm_fallback": _run_expiry_llm_fallback(db_path, job_id, params) + elif task_type == "recipe_llm": + _run_recipe_llm(db_path, job_id, params) else: raise ValueError(f"Unknown kiwi task type: {task_type!r}") _update_task_status(db_path, task_id, "completed") @@ -143,3 +145,41 @@ def _run_expiry_llm_fallback( expiry, days, ) + + +def _run_recipe_llm(db_path: Path, _job_id_int: int, params: str | None) -> None: + """Run LLM recipe generation for an async recipe job. + + params JSON keys: + job_id (required) — recipe_jobs.job_id string (e.g. "rec_a1b2c3...") + + Creates its own Store — follows same pattern as _suggest_in_thread. + MUST call store.fail_recipe_job() before re-raising so recipe_jobs.status + doesn't stay 'running' while background_tasks shows 'failed'. + """ + from app.db.store import Store + from app.models.schemas.recipe import RecipeRequest + from app.services.recipe.recipe_engine import RecipeEngine + + p = json.loads(params or "{}") + recipe_job_id: str = p.get("job_id", "") + if not recipe_job_id: + raise ValueError("recipe_llm: 'job_id' is required in params") + + store = Store(db_path) + try: + store.update_recipe_job_running(recipe_job_id) + row = store._fetch_one( + "SELECT request FROM recipe_jobs WHERE job_id=?", (recipe_job_id,) + ) + if row is None: + raise ValueError(f"recipe_llm: recipe_jobs row not found: {recipe_job_id!r}") + req = RecipeRequest.model_validate_json(row["request"]) + result = RecipeEngine(store).suggest(req) + store.complete_recipe_job(recipe_job_id, result.model_dump_json()) + log.info("recipe_llm: job %s completed (%d suggestion(s))", recipe_job_id, len(result.suggestions)) + except Exception as exc: + store.fail_recipe_job(recipe_job_id, str(exc)) + raise + finally: + store.close()