Adds the recipe_jobs table and background task pipeline for level 3/4
recipe generation. POST ?async=true returns 202 with job_id; clients
poll GET /recipes/jobs/{job_id} until status=done.
Key fix: _enqueue_recipe_job now calls scheduler.enqueue() after
insert_task() to wake the in-memory work queue immediately. Without
this, tasks sat in 'queued' until the scheduler's 30s idle cycle or
an API restart triggered _load_queued_tasks().
- Migration 034: recipe_jobs table (job_id, user_id, status, request,
result, error) with indexes on job_id and user_id/created_at
- Store: create/get/update_running/complete/fail recipe job methods
- runner.py: recipe_llm task type + _run_recipe_llm handler; MUST
call fail_recipe_job() before re-raising so status stays consistent
- CLOUD_MODE guard: falls back to sync generation (scheduler only
polls shared settings DB, not per-user DBs)
- L4 wildcard is covered by the same req.level in (3, 4) dispatch
185 lines
6 KiB
Python
185 lines
6 KiB
Python
# app/tasks/runner.py
|
|
"""Kiwi background task runner.
|
|
|
|
Implements the run_task_fn interface expected by circuitforge_core.tasks.scheduler.
|
|
Each kiwi LLM task type has its own handler below.
|
|
|
|
Public API:
|
|
LLM_TASK_TYPES — frozenset of task type strings to route through the scheduler
|
|
VRAM_BUDGETS — VRAM GB estimates per task type
|
|
insert_task() — deduplicating task insertion
|
|
run_task() — called by the scheduler batch worker
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
from datetime import date, timedelta
|
|
from pathlib import Path
|
|
|
|
from app.services.expiration_predictor import ExpirationPredictor
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
LLM_TASK_TYPES: frozenset[str] = frozenset({"expiry_llm_fallback", "recipe_llm"})
|
|
|
|
VRAM_BUDGETS: dict[str, float] = {
|
|
# ExpirationPredictor uses a small LLM (16 tokens out, single pass).
|
|
"expiry_llm_fallback": 2.0,
|
|
# Recipe LLM (levels 3-4): full recipe generation, ~200-500 tokens out.
|
|
# Budget assumes a quantized 7B-class model.
|
|
"recipe_llm": 4.0,
|
|
}
|
|
|
|
|
|
def insert_task(
|
|
db_path: Path,
|
|
task_type: str,
|
|
job_id: int,
|
|
*,
|
|
params: str | None = None,
|
|
) -> tuple[int, bool]:
|
|
"""Insert a background task if no identical task is already in-flight.
|
|
|
|
Returns (task_id, True) if a new task was created.
|
|
Returns (existing_id, False) if an identical task is already queued/running.
|
|
"""
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
existing = conn.execute(
|
|
"SELECT id FROM background_tasks "
|
|
"WHERE task_type=? AND job_id=? AND status IN ('queued','running')",
|
|
(task_type, job_id),
|
|
).fetchone()
|
|
if existing:
|
|
conn.close()
|
|
return existing["id"], False
|
|
cursor = conn.execute(
|
|
"INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)",
|
|
(task_type, job_id, params),
|
|
)
|
|
conn.commit()
|
|
task_id = cursor.lastrowid
|
|
conn.close()
|
|
return task_id, True
|
|
|
|
|
|
def _update_task_status(
|
|
db_path: Path, task_id: int, status: str, *, error: str = ""
|
|
) -> None:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute(
|
|
"UPDATE background_tasks "
|
|
"SET status=?, error=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
|
|
(status, error, task_id),
|
|
)
|
|
|
|
|
|
def run_task(
|
|
db_path: Path,
|
|
task_id: int,
|
|
task_type: str,
|
|
job_id: int,
|
|
params: str | None = None,
|
|
) -> None:
|
|
"""Execute one background task. Called by the scheduler's batch worker."""
|
|
_update_task_status(db_path, task_id, "running")
|
|
try:
|
|
if task_type == "expiry_llm_fallback":
|
|
_run_expiry_llm_fallback(db_path, job_id, params)
|
|
elif task_type == "recipe_llm":
|
|
_run_recipe_llm(db_path, job_id, params)
|
|
else:
|
|
raise ValueError(f"Unknown kiwi task type: {task_type!r}")
|
|
_update_task_status(db_path, task_id, "completed")
|
|
except Exception as exc:
|
|
log.exception("Task %d (%s) failed: %s", task_id, task_type, exc)
|
|
_update_task_status(db_path, task_id, "failed", error=str(exc))
|
|
|
|
|
|
def _run_expiry_llm_fallback(
|
|
db_path: Path,
|
|
item_id: int,
|
|
params: str | None,
|
|
) -> None:
|
|
"""Predict expiry date via LLM for an inventory item and write result to DB.
|
|
|
|
params JSON keys:
|
|
product_name (required) — e.g. "Trader Joe's Organic Tempeh"
|
|
category (optional) — category hint for the predictor
|
|
location (optional) — "fridge" | "freezer" | "pantry" (default: "fridge")
|
|
"""
|
|
p = json.loads(params or "{}")
|
|
product_name = p.get("product_name", "")
|
|
category = p.get("category")
|
|
location = p.get("location", "fridge")
|
|
|
|
if not product_name:
|
|
raise ValueError("expiry_llm_fallback: 'product_name' is required in params")
|
|
|
|
predictor = ExpirationPredictor()
|
|
days = predictor._llm_predict_days(product_name, category, location)
|
|
|
|
if days is None:
|
|
log.warning(
|
|
"LLM expiry fallback returned None for item_id=%d product=%r — "
|
|
"expiry_date will remain NULL",
|
|
item_id,
|
|
product_name,
|
|
)
|
|
return
|
|
|
|
expiry = (date.today() + timedelta(days=days)).isoformat()
|
|
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute(
|
|
"UPDATE inventory_items SET expiry_date=? WHERE id=?",
|
|
(expiry, item_id),
|
|
)
|
|
|
|
log.info(
|
|
"LLM expiry fallback: item_id=%d %r → %s (%d days)",
|
|
item_id,
|
|
product_name,
|
|
expiry,
|
|
days,
|
|
)
|
|
|
|
|
|
def _run_recipe_llm(db_path: Path, _job_id_int: int, params: str | None) -> None:
|
|
"""Run LLM recipe generation for an async recipe job.
|
|
|
|
params JSON keys:
|
|
job_id (required) — recipe_jobs.job_id string (e.g. "rec_a1b2c3...")
|
|
|
|
Creates its own Store — follows same pattern as _suggest_in_thread.
|
|
MUST call store.fail_recipe_job() before re-raising so recipe_jobs.status
|
|
doesn't stay 'running' while background_tasks shows 'failed'.
|
|
"""
|
|
from app.db.store import Store
|
|
from app.models.schemas.recipe import RecipeRequest
|
|
from app.services.recipe.recipe_engine import RecipeEngine
|
|
|
|
p = json.loads(params or "{}")
|
|
recipe_job_id: str = p.get("job_id", "")
|
|
if not recipe_job_id:
|
|
raise ValueError("recipe_llm: 'job_id' is required in params")
|
|
|
|
store = Store(db_path)
|
|
try:
|
|
store.update_recipe_job_running(recipe_job_id)
|
|
row = store._fetch_one(
|
|
"SELECT request FROM recipe_jobs WHERE job_id=?", (recipe_job_id,)
|
|
)
|
|
if row is None:
|
|
raise ValueError(f"recipe_llm: recipe_jobs row not found: {recipe_job_id!r}")
|
|
req = RecipeRequest.model_validate_json(row["request"])
|
|
result = RecipeEngine(store).suggest(req)
|
|
store.complete_recipe_job(recipe_job_id, result.model_dump_json())
|
|
log.info("recipe_llm: job %s completed (%d suggestion(s))", recipe_job_id, len(result.suggestions))
|
|
except Exception as exc:
|
|
store.fail_recipe_job(recipe_job_id, str(exc))
|
|
raise
|
|
finally:
|
|
store.close()
|