Merge branch 'feature/recipe-engine'

Recipe engine Phase 3 complete (Tasks 1-15):
- Data pipeline: USDA FDC, FlavorGraph, food.com corpus, substitution pairs
- ElementClassifier, SubstitutionEngine, RecipeEngine (Levels 1-4)
- StyleAdapter with 5 cuisine templates
- LLMRecipeGenerator (Level 3 scaffold + Level 4 wildcard)
- User settings with cooking_equipment for hard day mode
- Grocery affiliate links (Amazon Fresh, Walmart, Instacart)
- Recipe + staple + settings API endpoints with tier gating
- 55 tests passing
This commit is contained in:
pyr0ball 2026-03-31 14:27:32 -07:00
commit aeea8fc1c1
59 changed files with 2900 additions and 9 deletions

View file

@ -0,0 +1,47 @@
"""Recipe suggestion endpoints."""
from __future__ import annotations
import asyncio
from fastapi import APIRouter, Depends, HTTPException
from app.cloud_session import CloudUser, get_session
from app.db.session import get_store
from app.db.store import Store
from app.models.schemas.recipe import RecipeRequest, RecipeResult
from app.services.recipe.recipe_engine import RecipeEngine
from app.tiers import can_use
router = APIRouter()
@router.post("/suggest", response_model=RecipeResult)
async def suggest_recipes(
req: RecipeRequest,
session: CloudUser = Depends(get_session),
store: Store = Depends(get_store),
) -> RecipeResult:
# Inject session-authoritative tier/byok immediately — client-supplied values are ignored.
req = req.model_copy(update={"tier": session.tier, "has_byok": session.has_byok})
if req.level == 4 and not req.wildcard_confirmed:
raise HTTPException(
status_code=400,
detail="Level 4 (Wildcard) requires wildcard_confirmed=true.",
)
if req.level in (3, 4) and not can_use("recipe_suggestions", req.tier, req.has_byok):
raise HTTPException(
status_code=403,
detail="LLM recipe levels require Paid tier or a configured LLM backend.",
)
if req.style_id and not can_use("style_picker", req.tier):
raise HTTPException(status_code=403, detail="Style picker requires Paid tier.")
engine = RecipeEngine(store)
return await asyncio.to_thread(engine.suggest, req)
@router.get("/{recipe_id}")
async def get_recipe(recipe_id: int, store: Store = Depends(get_store)) -> dict:
recipe = await asyncio.to_thread(store.get_recipe, recipe_id)
if not recipe:
raise HTTPException(status_code=404, detail="Recipe not found.")
return recipe

View file

@ -0,0 +1,46 @@
"""User settings endpoints."""
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.cloud_session import CloudUser, get_session
from app.db.session import get_store
from app.db.store import Store
router = APIRouter()
_ALLOWED_KEYS = frozenset({"cooking_equipment"})
class SettingBody(BaseModel):
value: str
@router.get("/{key}")
async def get_setting(
key: str,
session: CloudUser = Depends(get_session),
store: Store = Depends(get_store),
) -> dict:
"""Return the stored value for a settings key."""
if key not in _ALLOWED_KEYS:
raise HTTPException(status_code=422, detail=f"Unknown settings key: '{key}'.")
value = store.get_setting(key)
if value is None:
raise HTTPException(status_code=404, detail=f"Setting '{key}' not found.")
return {"key": key, "value": value}
@router.put("/{key}")
async def set_setting(
key: str,
body: SettingBody,
session: CloudUser = Depends(get_session),
store: Store = Depends(get_store),
) -> dict:
"""Upsert a settings key-value pair."""
if key not in _ALLOWED_KEYS:
raise HTTPException(status_code=422, detail=f"Unknown settings key: '{key}'.")
store.set_setting(key, body.value)
return {"key": key, "value": body.value}

View file

@ -0,0 +1,42 @@
"""Staple library endpoints."""
from __future__ import annotations
from fastapi import APIRouter, HTTPException
from app.services.recipe.staple_library import StapleLibrary
router = APIRouter()
_lib = StapleLibrary()
@router.get("/")
async def list_staples(dietary: str | None = None) -> list[dict]:
staples = _lib.filter_by_dietary(dietary) if dietary else _lib.list_all()
return [
{
"slug": s.slug,
"name": s.name,
"description": s.description,
"dietary_labels": s.dietary_labels,
"yield_formats": list(s.yield_formats.keys()),
}
for s in staples
]
@router.get("/{slug}")
async def get_staple(slug: str) -> dict:
staple = _lib.get(slug)
if not staple:
raise HTTPException(status_code=404, detail=f"Staple '{slug}' not found.")
return {
"slug": staple.slug,
"name": staple.name,
"description": staple.description,
"dietary_labels": staple.dietary_labels,
"base_ingredients": staple.base_ingredients,
"base_method": staple.base_method,
"base_time_minutes": staple.base_time_minutes,
"yield_formats": staple.yield_formats,
"compatible_styles": staple.compatible_styles,
}

View file

@ -1,10 +1,13 @@
from fastapi import APIRouter
from app.api.endpoints import health, receipts, export, inventory, ocr
from app.api.endpoints import health, receipts, export, inventory, ocr, recipes, settings, staples
api_router = APIRouter()
api_router.include_router(health.router, prefix="/health", tags=["health"])
api_router.include_router(receipts.router, prefix="/receipts", tags=["receipts"])
api_router.include_router(ocr.router, prefix="/receipts", tags=["ocr"]) # OCR endpoints under /receipts
api_router.include_router(export.router, tags=["export"]) # No prefix, uses /export in the router
api_router.include_router(inventory.router, prefix="/inventory", tags=["inventory"])
api_router.include_router(health.router, prefix="/health", tags=["health"])
api_router.include_router(receipts.router, prefix="/receipts", tags=["receipts"])
api_router.include_router(ocr.router, prefix="/receipts", tags=["ocr"])
api_router.include_router(export.router, tags=["export"])
api_router.include_router(inventory.router, prefix="/inventory", tags=["inventory"])
api_router.include_router(recipes.router, prefix="/recipes", tags=["recipes"])
api_router.include_router(settings.router, prefix="/settings", tags=["settings"])
api_router.include_router(staples.router, prefix="/staples", tags=["staples"])

View file

@ -9,6 +9,7 @@ CREATE TABLE receipts_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
original_path TEXT NOT NULL,
processed_path TEXT,
status TEXT NOT NULL DEFAULT 'uploaded'
CHECK (status IN (
'uploaded',

View file

@ -0,0 +1,48 @@
-- Migration 006: Ingredient element profiles + FlavorGraph molecule index.
CREATE TABLE ingredient_profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
name_variants TEXT NOT NULL DEFAULT '[]', -- JSON array of aliases/alternate spellings
elements TEXT NOT NULL DEFAULT '[]', -- JSON array: ["Richness","Depth"]
-- Functional submetadata (from USDA FDC)
fat_pct REAL DEFAULT 0.0,
fat_saturated_pct REAL DEFAULT 0.0,
moisture_pct REAL DEFAULT 0.0,
protein_pct REAL DEFAULT 0.0,
starch_pct REAL DEFAULT 0.0,
binding_score INTEGER DEFAULT 0 CHECK (binding_score BETWEEN 0 AND 3),
glutamate_mg REAL DEFAULT 0.0,
ph_estimate REAL,
sodium_mg_per_100g REAL DEFAULT 0.0,
smoke_point_c REAL,
is_fermented INTEGER NOT NULL DEFAULT 0,
is_emulsifier INTEGER NOT NULL DEFAULT 0,
-- Aroma submetadata
flavor_molecule_ids TEXT NOT NULL DEFAULT '[]', -- JSON array of FlavorGraph compound IDs
heat_stable INTEGER NOT NULL DEFAULT 1,
add_timing TEXT NOT NULL DEFAULT 'any'
CHECK (add_timing IN ('early','finish','any')),
-- Brightness submetadata
acid_type TEXT CHECK (acid_type IN ('citric','acetic','lactic',NULL)),
-- Texture submetadata
texture_profile TEXT NOT NULL DEFAULT 'neutral',
water_activity REAL,
-- Source
usda_fdc_id TEXT,
source TEXT NOT NULL DEFAULT 'usda',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ingredient_profiles_name ON ingredient_profiles (name);
CREATE INDEX idx_ingredient_profiles_elements ON ingredient_profiles (elements);
CREATE TABLE flavor_molecules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
compound_id TEXT NOT NULL UNIQUE, -- FlavorGraph node ID
compound_name TEXT NOT NULL,
ingredient_names TEXT NOT NULL DEFAULT '[]', -- JSON array of ingredient names
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_flavor_molecules_compound_id ON flavor_molecules (compound_id);

View file

@ -0,0 +1,24 @@
-- Migration 007: Recipe corpus index (food.com dataset).
CREATE TABLE recipes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
external_id TEXT,
title TEXT NOT NULL,
ingredients TEXT NOT NULL DEFAULT '[]', -- JSON array of raw ingredient strings
ingredient_names TEXT NOT NULL DEFAULT '[]', -- JSON array of normalized names
directions TEXT NOT NULL DEFAULT '[]', -- JSON array of step strings
category TEXT,
keywords TEXT NOT NULL DEFAULT '[]', -- JSON array
calories REAL,
fat_g REAL,
protein_g REAL,
sodium_mg REAL,
-- Element coverage scores computed at import time
element_coverage TEXT NOT NULL DEFAULT '{}', -- JSON {element: 0.0-1.0}
source TEXT NOT NULL DEFAULT 'foodcom',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_recipes_title ON recipes (title);
CREATE INDEX idx_recipes_category ON recipes (category);
CREATE INDEX idx_recipes_external_id ON recipes (external_id);

View file

@ -0,0 +1,22 @@
-- Migration 008: Derived substitution pairs.
-- Source: diff of lishuyang/recipepairs (GPL-3.0 derivation — raw data not shipped).
CREATE TABLE substitution_pairs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_name TEXT NOT NULL,
substitute_name TEXT NOT NULL,
constraint_label TEXT NOT NULL, -- 'vegan'|'vegetarian'|'dairy_free'|'gluten_free'|'low_fat'|'low_sodium'
fat_delta REAL DEFAULT 0.0,
moisture_delta REAL DEFAULT 0.0,
glutamate_delta REAL DEFAULT 0.0,
protein_delta REAL DEFAULT 0.0,
occurrence_count INTEGER DEFAULT 1,
compensation_hints TEXT NOT NULL DEFAULT '[]', -- JSON [{ingredient, reason, element}]
source TEXT NOT NULL DEFAULT 'derived',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_substitution_pairs_original ON substitution_pairs (original_name);
CREATE INDEX idx_substitution_pairs_constraint ON substitution_pairs (constraint_label);
CREATE UNIQUE INDEX idx_substitution_pairs_pair
ON substitution_pairs (original_name, substitute_name, constraint_label);

View file

@ -0,0 +1,27 @@
-- Migration 009: Staple library (bulk-preparable base components).
CREATE TABLE staples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
slug TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
description TEXT,
base_ingredients TEXT NOT NULL DEFAULT '[]', -- JSON array of ingredient strings
base_method TEXT,
base_time_minutes INTEGER,
yield_formats TEXT NOT NULL DEFAULT '{}', -- JSON {format_name: {elements, shelf_days, methods, texture}}
dietary_labels TEXT NOT NULL DEFAULT '[]', -- JSON ['vegan','high-protein']
compatible_styles TEXT NOT NULL DEFAULT '[]', -- JSON [style_id]
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE user_staples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
staple_slug TEXT NOT NULL REFERENCES staples(slug) ON DELETE CASCADE,
active_format TEXT NOT NULL,
quantity_g REAL,
prepared_at TEXT,
notes TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_user_staples_slug ON user_staples (staple_slug);

View file

@ -0,0 +1,15 @@
-- Migration 010: User substitution approval log (opt-in dataset moat).
CREATE TABLE substitution_feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_name TEXT NOT NULL,
substitute_name TEXT NOT NULL,
constraint_label TEXT,
compensation_used TEXT NOT NULL DEFAULT '[]', -- JSON array of compensation ingredient names
approved INTEGER NOT NULL DEFAULT 0,
opted_in INTEGER NOT NULL DEFAULT 0, -- user consented to anonymized sharing
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_substitution_feedback_original ON substitution_feedback (original_name);
CREATE INDEX idx_substitution_feedback_opted_in ON substitution_feedback (opted_in);

View file

@ -0,0 +1,11 @@
-- Migration 011: Daily rate limits (leftover mode: 5/day free tier).
CREATE TABLE rate_limits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
feature TEXT NOT NULL,
window_date TEXT NOT NULL, -- YYYY-MM-DD
count INTEGER NOT NULL DEFAULT 0,
UNIQUE (feature, window_date)
);
CREATE INDEX idx_rate_limits_feature_date ON rate_limits (feature, window_date);

View file

@ -0,0 +1,6 @@
-- Migration 012: User settings key-value store.
CREATE TABLE IF NOT EXISTS user_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);

View file

@ -32,7 +32,10 @@ class Store:
# Deserialise any TEXT columns that contain JSON
for key in ("metadata", "nutrition_data", "source_data", "items",
"metrics", "improvement_suggestions", "confidence_scores",
"warnings"):
"warnings",
# recipe columns
"ingredients", "ingredient_names", "directions",
"keywords", "element_coverage"):
if key in d and isinstance(d[key], str):
try:
d[key] = json.loads(d[key])
@ -260,3 +263,107 @@ class Store:
return self._fetch_one(
"SELECT * FROM receipt_data WHERE receipt_id = ?", (receipt_id,)
)
# ── recipes ───────────────────────────────────────────────────────────
def search_recipes_by_ingredients(
self,
ingredient_names: list[str],
limit: int = 20,
category: str | None = None,
) -> list[dict]:
"""Find recipes containing any of the given ingredient names.
Scores by match count and returns highest-scoring first."""
if not ingredient_names:
return []
like_params = [f'%"{n}"%' for n in ingredient_names]
like_clauses = " OR ".join(
"r.ingredient_names LIKE ?" for _ in ingredient_names
)
match_score = " + ".join(
"CASE WHEN r.ingredient_names LIKE ? THEN 1 ELSE 0 END"
for _ in ingredient_names
)
category_clause = ""
category_params: list = []
if category:
category_clause = "AND r.category = ?"
category_params = [category]
sql = f"""
SELECT r.*, ({match_score}) AS match_count
FROM recipes r
WHERE ({like_clauses})
{category_clause}
ORDER BY match_count DESC, r.id ASC
LIMIT ?
"""
all_params = like_params + like_params + category_params + [limit]
return self._fetch_all(sql, tuple(all_params))
def get_recipe(self, recipe_id: int) -> dict | None:
return self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
# ── rate limits ───────────────────────────────────────────────────────
def check_and_increment_rate_limit(
self, feature: str, daily_max: int
) -> tuple[bool, int]:
"""Check daily counter for feature; only increment if under the limit.
Returns (allowed, current_count). Rejected calls do not consume quota."""
from datetime import date
today = date.today().isoformat()
row = self._fetch_one(
"SELECT count FROM rate_limits WHERE feature = ? AND window_date = ?",
(feature, today),
)
current = row["count"] if row else 0
if current >= daily_max:
return (False, current)
self.conn.execute("""
INSERT INTO rate_limits (feature, window_date, count)
VALUES (?, ?, 1)
ON CONFLICT(feature, window_date) DO UPDATE SET count = count + 1
""", (feature, today))
self.conn.commit()
return (True, current + 1)
# ── user settings ────────────────────────────────────────────────────
def get_setting(self, key: str) -> str | None:
"""Return the value for a settings key, or None if not set."""
row = self._fetch_one(
"SELECT value FROM user_settings WHERE key = ?", (key,)
)
return row["value"] if row else None
def set_setting(self, key: str, value: str) -> None:
"""Upsert a settings key-value pair."""
self.conn.execute(
"INSERT INTO user_settings (key, value) VALUES (?, ?)"
" ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(key, value),
)
self.conn.commit()
# ── substitution feedback ─────────────────────────────────────────────
def log_substitution_feedback(
self,
original: str,
substitute: str,
constraint: str | None,
compensation_used: list[str],
approved: bool,
opted_in: bool,
) -> None:
self.conn.execute("""
INSERT INTO substitution_feedback
(original_name, substitute_name, constraint_label,
compensation_used, approved, opted_in)
VALUES (?,?,?,?,?,?)
""", (
original, substitute, constraint,
self._dump(compensation_used),
int(approved), int(opted_in),
))
self.conn.commit()

View file

@ -0,0 +1,54 @@
"""Pydantic schemas for the recipe engine API."""
from __future__ import annotations
from pydantic import BaseModel, Field
class SwapCandidate(BaseModel):
original_name: str
substitute_name: str
constraint_label: str
explanation: str
compensation_hints: list[dict] = Field(default_factory=list)
class RecipeSuggestion(BaseModel):
id: int
title: str
match_count: int
element_coverage: dict[str, float] = Field(default_factory=dict)
swap_candidates: list[SwapCandidate] = Field(default_factory=list)
missing_ingredients: list[str] = Field(default_factory=list)
directions: list[str] = Field(default_factory=list)
notes: str = ""
level: int = 1
is_wildcard: bool = False
class GroceryLink(BaseModel):
ingredient: str
retailer: str
url: str
class RecipeResult(BaseModel):
suggestions: list[RecipeSuggestion]
element_gaps: list[str]
grocery_list: list[str] = Field(default_factory=list)
grocery_links: list[GroceryLink] = Field(default_factory=list)
rate_limited: bool = False
rate_limit_count: int = 0
class RecipeRequest(BaseModel):
pantry_items: list[str]
level: int = Field(default=1, ge=1, le=4)
constraints: list[str] = Field(default_factory=list)
expiry_first: bool = False
hard_day_mode: bool = False
max_missing: int | None = None
style_id: str | None = None
tier: str = "free"
has_byok: bool = False
wildcard_confirmed: bool = False
allergies: list[str] = Field(default_factory=list)

View file

View file

@ -0,0 +1,135 @@
"""
ElementClassifier -- classify pantry items into culinary element tags.
Lookup order:
1. ingredient_profiles table (pre-computed from USDA FDC)
2. Keyword heuristic fallback (for unlisted ingredients)
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app.db.store import Store
# All valid ingredient-level element labels (Method is recipe-level, not ingredient-level)
ELEMENTS = frozenset({
"Seasoning", "Richness", "Brightness", "Depth",
"Aroma", "Structure", "Texture",
})
_HEURISTIC: list[tuple[list[str], str]] = [
(["vinegar", "lemon", "lime", "citrus", "wine", "yogurt", "kefir",
"buttermilk", "tomato", "tamarind"], "Brightness"),
(["oil", "butter", "cream", "lard", "fat", "avocado", "coconut milk",
"ghee", "shortening", "crisco"], "Richness"),
(["salt", "soy", "miso", "tamari", "fish sauce", "worcestershire",
"anchov", "capers", "olive", "brine"], "Seasoning"),
(["mushroom", "parmesan", "miso", "nutritional yeast", "bouillon",
"broth", "umami", "anchov", "dried tomato", "soy"], "Depth"),
(["garlic", "onion", "shallot", "herb", "basil", "oregano", "thyme",
"rosemary", "spice", "cumin", "coriander", "paprika", "chili",
"ginger", "cinnamon", "pepper", "cilantro", "dill", "fennel",
"cardamom", "turmeric", "smoke"], "Aroma"),
(["flour", "starch", "cornstarch", "arrowroot", "egg", "gelatin",
"agar", "breadcrumb", "panko", "roux"], "Structure"),
(["nut", "seed", "cracker", "crisp", "wafer", "chip", "crouton",
"granola", "tofu", "tempeh"], "Texture"),
]
def _safe_json_list(val) -> list:
if isinstance(val, list):
return val
if isinstance(val, str):
try:
return json.loads(val)
except Exception:
return []
return []
@dataclass(frozen=True)
class IngredientProfile:
name: str
elements: list[str]
fat_pct: float = 0.0
fat_saturated_pct: float = 0.0
moisture_pct: float = 0.0
protein_pct: float = 0.0
starch_pct: float = 0.0
binding_score: int = 0
glutamate_mg: float = 0.0
ph_estimate: float | None = None
flavor_molecule_ids: list[str] = field(default_factory=list)
heat_stable: bool = True
add_timing: str = "any"
acid_type: str | None = None
sodium_mg_per_100g: float = 0.0
is_fermented: bool = False
texture_profile: str = "neutral"
smoke_point_c: float | None = None
is_emulsifier: bool = False
source: str = "heuristic"
class ElementClassifier:
def __init__(self, store: "Store") -> None:
self._store = store
def classify(self, ingredient_name: str) -> IngredientProfile:
"""Return element profile for a single ingredient name."""
name = ingredient_name.lower().strip()
if not name:
return IngredientProfile(name="", elements=[], source="heuristic")
row = self._store._fetch_one(
"SELECT * FROM ingredient_profiles WHERE name = ?", (name,)
)
if row:
return self._row_to_profile(row)
return self._heuristic_profile(name)
def classify_batch(self, names: list[str]) -> list[IngredientProfile]:
return [self.classify(n) for n in names]
def identify_gaps(self, profiles: list[IngredientProfile]) -> list[str]:
"""Return element names that have no coverage in the given profile list."""
covered = set()
for p in profiles:
covered.update(p.elements)
return sorted(ELEMENTS - covered)
def _row_to_profile(self, row: dict) -> IngredientProfile:
return IngredientProfile(
name=row["name"],
elements=_safe_json_list(row.get("elements")),
fat_pct=row.get("fat_pct") or 0.0,
fat_saturated_pct=row.get("fat_saturated_pct") or 0.0,
moisture_pct=row.get("moisture_pct") or 0.0,
protein_pct=row.get("protein_pct") or 0.0,
starch_pct=row.get("starch_pct") or 0.0,
binding_score=row.get("binding_score") or 0,
glutamate_mg=row.get("glutamate_mg") or 0.0,
ph_estimate=row.get("ph_estimate"),
flavor_molecule_ids=_safe_json_list(row.get("flavor_molecule_ids")),
heat_stable=bool(row.get("heat_stable", 1)),
add_timing=row.get("add_timing") or "any",
acid_type=row.get("acid_type"),
sodium_mg_per_100g=row.get("sodium_mg_per_100g") or 0.0,
is_fermented=bool(row.get("is_fermented", 0)),
texture_profile=row.get("texture_profile") or "neutral",
smoke_point_c=row.get("smoke_point_c"),
is_emulsifier=bool(row.get("is_emulsifier", 0)),
source="db",
)
def _heuristic_profile(self, name: str) -> IngredientProfile:
seen: set[str] = set()
elements: list[str] = []
for keywords, element in _HEURISTIC:
if element not in seen and any(kw in name for kw in keywords):
elements.append(element)
seen.add(element)
return IngredientProfile(name=name, elements=elements, source="heuristic")

View file

@ -0,0 +1,75 @@
"""
GroceryLinkBuilder affiliate deeplinks for missing ingredient grocery lists.
Free tier: URL construction only (Amazon Fresh, Walmart, Instacart).
Paid+: live product search API (stubbed future task).
Config (env vars, all optional missing = retailer disabled):
AMAZON_AFFILIATE_TAG e.g. "circuitforge-20"
INSTACART_AFFILIATE_ID e.g. "circuitforge"
WALMART_AFFILIATE_ID e.g. "circuitforge" (Impact affiliate network)
"""
from __future__ import annotations
import os
from urllib.parse import quote_plus
from app.models.schemas.recipe import GroceryLink
def _amazon_link(ingredient: str, tag: str) -> GroceryLink:
q = quote_plus(ingredient)
url = f"https://www.amazon.com/s?k={q}&i=amazonfresh&tag={tag}"
return GroceryLink(ingredient=ingredient, retailer="Amazon Fresh", url=url)
def _walmart_link(ingredient: str, affiliate_id: str) -> GroceryLink:
q = quote_plus(ingredient)
# Walmart Impact affiliate deeplink pattern
url = f"https://goto.walmart.com/c/{affiliate_id}/walmart?u=https://www.walmart.com/search?q={q}"
return GroceryLink(ingredient=ingredient, retailer="Walmart Grocery", url=url)
def _instacart_link(ingredient: str, affiliate_id: str) -> GroceryLink:
q = quote_plus(ingredient)
url = f"https://www.instacart.com/store/s?k={q}&aff={affiliate_id}"
return GroceryLink(ingredient=ingredient, retailer="Instacart", url=url)
class GroceryLinkBuilder:
def __init__(self, tier: str = "free", has_byok: bool = False) -> None:
self._tier = tier
self._has_byok = has_byok
self._amazon_tag = os.environ.get("AMAZON_AFFILIATE_TAG", "")
self._instacart_id = os.environ.get("INSTACART_AFFILIATE_ID", "")
self._walmart_id = os.environ.get("WALMART_AFFILIATE_ID", "")
def build_links(self, ingredient: str) -> list[GroceryLink]:
"""Build affiliate deeplinks for a single ingredient.
Free tier: URL construction only.
Paid+: would call live product search APIs (stubbed).
"""
if not ingredient.strip():
return []
links: list[GroceryLink] = []
if self._amazon_tag:
links.append(_amazon_link(ingredient, self._amazon_tag))
if self._walmart_id:
links.append(_walmart_link(ingredient, self._walmart_id))
if self._instacart_id:
links.append(_instacart_link(ingredient, self._instacart_id))
# Paid+: live API stub (future task)
# if self._tier in ("paid", "premium") and not self._has_byok:
# links.extend(self._search_kroger_api(ingredient))
return links
def build_all(self, ingredients: list[str]) -> list[GroceryLink]:
"""Build links for a list of ingredients."""
links: list[GroceryLink] = []
for ingredient in ingredients:
links.extend(self.build_links(ingredient))
return links

View file

@ -0,0 +1,210 @@
"""LLM-driven recipe generator for Levels 3 and 4."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app.db.store import Store
from app.models.schemas.recipe import RecipeRequest, RecipeResult, RecipeSuggestion
from app.services.recipe.element_classifier import IngredientProfile
from app.services.recipe.style_adapter import StyleAdapter
logger = logging.getLogger(__name__)
def _filter_allergies(pantry_items: list[str], allergies: list[str]) -> list[str]:
"""Return pantry items with allergy matches removed (bidirectional substring)."""
if not allergies:
return list(pantry_items)
return [
item for item in pantry_items
if not any(
allergy.lower() in item.lower() or item.lower() in allergy.lower()
for allergy in allergies
)
]
class LLMRecipeGenerator:
def __init__(self, store: "Store") -> None:
self._store = store
self._style_adapter = StyleAdapter()
def build_level3_prompt(
self,
req: RecipeRequest,
profiles: list[IngredientProfile],
gaps: list[str],
) -> str:
"""Build a structured element-scaffold prompt for Level 3."""
allergy_list = req.allergies
safe_pantry = _filter_allergies(req.pantry_items, allergy_list)
covered_elements: list[str] = []
for profile in profiles:
for element in profile.elements:
if element not in covered_elements:
covered_elements.append(element)
lines: list[str] = [
"You are a creative chef. Generate a recipe using the ingredients below.",
"",
f"Pantry items: {', '.join(safe_pantry)}",
]
if req.constraints:
lines.append(f"Dietary constraints: {', '.join(req.constraints)}")
if allergy_list:
lines.append(f"IMPORTANT — must NOT contain: {', '.join(allergy_list)}")
lines.append("")
lines.append(f"Covered culinary elements: {', '.join(covered_elements) or 'none'}")
if gaps:
lines.append(
f"Missing elements to address: {', '.join(gaps)}. "
"Incorporate ingredients or techniques to fill these gaps."
)
if req.style_id:
template = self._style_adapter.get(req.style_id)
if template:
lines.append(f"Cuisine style: {template.name}")
if template.aromatics:
lines.append(f"Preferred aromatics: {', '.join(template.aromatics[:4])}")
lines += [
"",
"Reply in this format:",
"Title: <recipe name>",
"Ingredients: <comma-separated list>",
"Directions: <numbered steps>",
"Notes: <optional tips>",
]
return "\n".join(lines)
def build_level4_prompt(
self,
req: RecipeRequest,
) -> str:
"""Build a minimal wildcard prompt for Level 4."""
allergy_list = req.allergies
safe_pantry = _filter_allergies(req.pantry_items, allergy_list)
lines: list[str] = [
"Surprise me with a creative, unexpected recipe.",
f"Ingredients available: {', '.join(safe_pantry)}",
]
if req.constraints:
lines.append(f"Constraints: {', '.join(req.constraints)}")
if allergy_list:
lines.append(f"Must NOT contain: {', '.join(allergy_list)}")
lines += [
"Treat any mystery ingredient as a wildcard — use your imagination.",
"Title: <name> | Ingredients: <list> | Directions: <steps>",
]
return "\n".join(lines)
def _call_llm(self, prompt: str) -> str:
"""Call the LLM router and return the response text."""
try:
from circuitforge_core.llm.router import LLMRouter
router = LLMRouter()
return router.complete(prompt)
except Exception as exc:
logger.error("LLM call failed: %s", exc)
return ""
def _parse_response(self, response: str) -> dict[str, str | list[str]]:
"""Parse LLM response text into structured recipe fields."""
result: dict[str, str | list[str]] = {
"title": "",
"ingredients": [],
"directions": "",
"notes": "",
}
current_key: str | None = None
buffer: list[str] = []
def _flush(key: str | None, buf: list[str]) -> None:
if key is None or not buf:
return
text = " ".join(buf).strip()
if key == "ingredients":
result["ingredients"] = [i.strip() for i in text.split(",") if i.strip()]
else:
result[key] = text
for line in response.splitlines():
lower = line.lower().strip()
if lower.startswith("title:"):
_flush(current_key, buffer)
current_key, buffer = "title", [line.split(":", 1)[1].strip()]
elif lower.startswith("ingredients:"):
_flush(current_key, buffer)
current_key, buffer = "ingredients", [line.split(":", 1)[1].strip()]
elif lower.startswith("directions:"):
_flush(current_key, buffer)
current_key, buffer = "directions", [line.split(":", 1)[1].strip()]
elif lower.startswith("notes:"):
_flush(current_key, buffer)
current_key, buffer = "notes", [line.split(":", 1)[1].strip()]
elif current_key and line.strip():
buffer.append(line.strip())
_flush(current_key, buffer)
return result
def generate(
self,
req: RecipeRequest,
profiles: list[IngredientProfile],
gaps: list[str],
) -> RecipeResult:
"""Generate a recipe via LLM and return a RecipeResult."""
if req.level == 4:
prompt = self.build_level4_prompt(req)
else:
prompt = self.build_level3_prompt(req, profiles, gaps)
response = self._call_llm(prompt)
if not response:
return RecipeResult(suggestions=[], element_gaps=gaps)
parsed = self._parse_response(response)
raw_directions = parsed.get("directions", "")
directions_list: list[str] = (
[s.strip() for s in raw_directions.split(".") if s.strip()]
if isinstance(raw_directions, str)
else list(raw_directions)
)
raw_notes = parsed.get("notes", "")
notes_str: str = raw_notes if isinstance(raw_notes, str) else ""
suggestion = RecipeSuggestion(
id=0,
title=parsed.get("title") or "LLM Recipe",
match_count=len(req.pantry_items),
element_coverage={},
missing_ingredients=list(parsed.get("ingredients", [])),
directions=directions_list,
notes=notes_str,
level=req.level,
is_wildcard=(req.level == 4),
)
return RecipeResult(
suggestions=[suggestion],
element_gaps=gaps,
)

View file

@ -0,0 +1,186 @@
"""
RecipeEngine orchestrates the four creativity levels.
Level 1: corpus lookup ranked by ingredient match + expiry urgency
Level 2: Level 1 + deterministic substitution swaps
Level 3: element scaffold LLM constrained prompt (see llm_recipe.py)
Level 4: wildcard LLM (see llm_recipe.py)
Amendments:
- max_missing: filter to recipes missing N pantry items
- hard_day_mode: filter to easy-method recipes only
- grocery_list: aggregated missing ingredients across suggestions
"""
from __future__ import annotations
import json
import re
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app.db.store import Store
from app.models.schemas.recipe import GroceryLink, RecipeRequest, RecipeResult, RecipeSuggestion, SwapCandidate
from app.services.recipe.element_classifier import ElementClassifier
from app.services.recipe.grocery_links import GroceryLinkBuilder
from app.services.recipe.substitution_engine import SubstitutionEngine
_LEFTOVER_DAILY_MAX_FREE = 5
# Method complexity classification patterns
_EASY_METHODS = re.compile(
r"\b(microwave|mix|stir|blend|toast|assemble|heat)\b", re.IGNORECASE
)
_INVOLVED_METHODS = re.compile(
r"\b(braise|roast|knead|deep.?fry|fry|sauté|saute|bake|boil)\b", re.IGNORECASE
)
def _classify_method_complexity(
directions: list[str],
available_equipment: list[str] | None = None,
) -> str:
"""Classify recipe method complexity from direction strings.
Returns 'easy', 'moderate', or 'involved'.
available_equipment can expand the easy set (e.g. ['toaster', 'air fryer']).
"""
text = " ".join(directions).lower()
equipment_set = {e.lower() for e in (available_equipment or [])}
if _INVOLVED_METHODS.search(text):
return "involved"
if _EASY_METHODS.search(text):
return "easy"
# Check equipment-specific easy methods
for equip in equipment_set:
if equip in text:
return "easy"
return "moderate"
class RecipeEngine:
def __init__(self, store: "Store") -> None:
self._store = store
self._classifier = ElementClassifier(store)
self._substitution = SubstitutionEngine(store)
def suggest(
self,
req: RecipeRequest,
available_equipment: list[str] | None = None,
) -> RecipeResult:
# Load cooking equipment from user settings when hard_day_mode is active
if req.hard_day_mode and available_equipment is None:
equipment_json = self._store.get_setting("cooking_equipment")
if equipment_json:
try:
available_equipment = json.loads(equipment_json)
except (json.JSONDecodeError, TypeError):
available_equipment = []
else:
available_equipment = []
# Rate-limit leftover mode for free tier
if req.expiry_first and req.tier == "free":
allowed, count = self._store.check_and_increment_rate_limit(
"leftover_mode", _LEFTOVER_DAILY_MAX_FREE
)
if not allowed:
return RecipeResult(
suggestions=[], element_gaps=[], rate_limited=True, rate_limit_count=count
)
profiles = self._classifier.classify_batch(req.pantry_items)
gaps = self._classifier.identify_gaps(profiles)
pantry_set = {item.lower().strip() for item in req.pantry_items}
if req.level >= 3:
from app.services.recipe.llm_recipe import LLMRecipeGenerator
gen = LLMRecipeGenerator(self._store)
return gen.generate(req, profiles, gaps)
# Level 1 & 2: deterministic path
rows = self._store.search_recipes_by_ingredients(req.pantry_items, limit=20)
suggestions = []
for row in rows:
ingredient_names: list[str] = row.get("ingredient_names") or []
if isinstance(ingredient_names, str):
try:
ingredient_names = json.loads(ingredient_names)
except Exception:
ingredient_names = []
# Compute missing ingredients
missing = [n for n in ingredient_names if n.lower() not in pantry_set]
# Filter by max_missing
if req.max_missing is not None and len(missing) > req.max_missing:
continue
# Filter by hard_day_mode
if req.hard_day_mode:
directions: list[str] = row.get("directions") or []
if isinstance(directions, str):
try:
directions = json.loads(directions)
except Exception:
directions = [directions]
complexity = _classify_method_complexity(directions, available_equipment)
if complexity == "involved":
continue
# Build swap candidates for Level 2
swap_candidates: list[SwapCandidate] = []
if req.level == 2 and req.constraints:
for ing in ingredient_names:
for constraint in req.constraints:
swaps = self._substitution.find_substitutes(ing, constraint)
for swap in swaps[:1]:
swap_candidates.append(SwapCandidate(
original_name=swap.original_name,
substitute_name=swap.substitute_name,
constraint_label=swap.constraint_label,
explanation=swap.explanation,
compensation_hints=swap.compensation_hints,
))
coverage_raw = row.get("element_coverage") or {}
if isinstance(coverage_raw, str):
try:
coverage_raw = json.loads(coverage_raw)
except Exception:
coverage_raw = {}
suggestions.append(RecipeSuggestion(
id=row["id"],
title=row["title"],
match_count=int(row.get("match_count") or 0),
element_coverage=coverage_raw,
swap_candidates=swap_candidates,
missing_ingredients=missing,
level=req.level,
))
# Build grocery list — deduplicated union of all missing ingredients
seen: set[str] = set()
grocery_list: list[str] = []
for s in suggestions:
for item in s.missing_ingredients:
if item not in seen:
grocery_list.append(item)
seen.add(item)
# Build grocery links — affiliate deeplinks for each missing ingredient
link_builder = GroceryLinkBuilder(tier=req.tier, has_byok=req.has_byok)
grocery_links = link_builder.build_all(grocery_list)
return RecipeResult(
suggestions=suggestions,
element_gaps=gaps,
grocery_list=grocery_list,
grocery_links=grocery_links,
)

View file

@ -0,0 +1,60 @@
"""
StapleLibrary -- bulk-preparable base component reference data.
Loaded from YAML files in app/staples/.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
_STAPLES_DIR = Path(__file__).parents[2] / "staples"
@dataclass(frozen=True)
class StapleEntry:
slug: str
name: str
description: str
dietary_labels: list[str]
base_ingredients: list[str]
base_method: str
base_time_minutes: int
yield_formats: dict[str, Any]
compatible_styles: list[str]
class StapleLibrary:
def __init__(self, staples_dir: Path = _STAPLES_DIR) -> None:
self._staples: dict[str, StapleEntry] = {}
for yaml_path in sorted(staples_dir.glob("*.yaml")):
entry = self._load(yaml_path)
self._staples[entry.slug] = entry
def get(self, slug: str) -> StapleEntry | None:
return self._staples.get(slug)
def list_all(self) -> list[StapleEntry]:
return list(self._staples.values())
def filter_by_dietary(self, label: str) -> list[StapleEntry]:
return [s for s in self._staples.values() if label in s.dietary_labels]
def _load(self, path: Path) -> StapleEntry:
try:
data = yaml.safe_load(path.read_text())
return StapleEntry(
slug=data["slug"],
name=data["name"],
description=data.get("description", ""),
dietary_labels=data.get("dietary_labels", []),
base_ingredients=data.get("base_ingredients", []),
base_method=data.get("base_method", ""),
base_time_minutes=int(data.get("base_time_minutes", 0)),
yield_formats=data.get("yield_formats", {}),
compatible_styles=data.get("compatible_styles", []),
)
except (KeyError, yaml.YAMLError) as exc:
raise ValueError(f"Failed to load staple from {path}: {exc}") from exc

View file

@ -0,0 +1,132 @@
"""
StyleAdapter cuisine-mode overlay that biases element dimensions.
YAML templates in app/styles/.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import yaml
_STYLES_DIR = Path(__file__).parents[2] / "styles"
@dataclass(frozen=True)
class StyleTemplate:
style_id: str
name: str
aromatics: tuple[str, ...]
depth_sources: tuple[str, ...]
brightness_sources: tuple[str, ...]
method_bias: dict[str, float]
structure_forms: tuple[str, ...]
seasoning_bias: str
finishing_fat_str: str
def bias_aroma_selection(self, pantry_items: list[str]) -> list[str]:
"""Return aromatics present in pantry (bidirectional substring match)."""
result = []
for aroma in self.aromatics:
for item in pantry_items:
if aroma.lower() in item.lower() or item.lower() in aroma.lower():
result.append(aroma)
break
return result
def preferred_depth_sources(self, pantry_items: list[str]) -> list[str]:
"""Return depth_sources present in pantry."""
result = []
for src in self.depth_sources:
for item in pantry_items:
if src.lower() in item.lower() or item.lower() in src.lower():
result.append(src)
break
return result
def preferred_structure_forms(self, pantry_items: list[str]) -> list[str]:
"""Return structure_forms present in pantry."""
result = []
for form in self.structure_forms:
for item in pantry_items:
if form.lower() in item.lower() or item.lower() in form.lower():
result.append(form)
break
return result
def method_weights(self) -> dict[str, float]:
"""Return method bias weights."""
return dict(self.method_bias)
def seasoning_vector(self) -> str:
"""Return seasoning bias."""
return self.seasoning_bias
def finishing_fat(self) -> str:
"""Return finishing fat."""
return self.finishing_fat_str
class StyleAdapter:
def __init__(self, styles_dir: Path = _STYLES_DIR) -> None:
self._styles: dict[str, StyleTemplate] = {}
for yaml_path in sorted(styles_dir.glob("*.yaml")):
try:
template = self._load(yaml_path)
self._styles[template.style_id] = template
except (KeyError, yaml.YAMLError, TypeError) as exc:
raise ValueError(f"Failed to load style from {yaml_path}: {exc}") from exc
@property
def styles(self) -> dict[str, StyleTemplate]:
return self._styles
def get(self, style_id: str) -> StyleTemplate | None:
return self._styles.get(style_id)
def list_all(self) -> list[StyleTemplate]:
return list(self._styles.values())
def bias_aroma_selection(self, style_id: str, pantry_items: list[str]) -> list[str]:
"""Return pantry items that match the style's preferred aromatics.
Falls back to all pantry items if no match found."""
template = self._styles.get(style_id)
if not template:
return pantry_items
matched = [
item for item in pantry_items
if any(
aroma.lower() in item.lower() or item.lower() in aroma.lower()
for aroma in template.aromatics
)
]
return matched if matched else pantry_items
def apply(self, style_id: str, pantry_items: list[str]) -> dict:
"""Return style-biased ingredient guidance for each element dimension."""
template = self._styles.get(style_id)
if not template:
return {}
return {
"aroma_candidates": self.bias_aroma_selection(style_id, pantry_items),
"depth_suggestions": list(template.depth_sources),
"brightness_suggestions": list(template.brightness_sources),
"method_bias": template.method_bias,
"structure_forms": list(template.structure_forms),
"seasoning_bias": template.seasoning_bias,
"finishing_fat": template.finishing_fat_str,
}
def _load(self, path: Path) -> StyleTemplate:
data = yaml.safe_load(path.read_text())
return StyleTemplate(
style_id=data["style_id"],
name=data["name"],
aromatics=tuple(data.get("aromatics", [])),
depth_sources=tuple(data.get("depth_sources", [])),
brightness_sources=tuple(data.get("brightness_sources", [])),
method_bias=dict(data.get("method_bias", {})),
structure_forms=tuple(data.get("structure_forms", [])),
seasoning_bias=data.get("seasoning_bias", ""),
finishing_fat_str=data.get("finishing_fat", ""),
)

View file

@ -0,0 +1,126 @@
"""
SubstitutionEngine deterministic ingredient swap candidates with compensation hints.
Powered by:
- substitution_pairs table (derived from lishuyang/recipepairs)
- ingredient_profiles functional metadata (USDA FDC)
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app.db.store import Store
# Compensation threshold — if |delta| exceeds this, surface a hint
_FAT_THRESHOLD = 5.0 # grams per 100g
_GLUTAMATE_THRESHOLD = 1.0 # mg per 100g
_MOISTURE_THRESHOLD = 15.0 # grams per 100g
_RICHNESS_COMPENSATORS = ["olive oil", "coconut oil", "butter", "shortening", "full-fat coconut milk"]
_DEPTH_COMPENSATORS = ["nutritional yeast", "soy sauce", "miso", "mushroom powder",
"better than bouillon not-beef", "smoked paprika"]
_MOISTURE_BINDERS = ["cornstarch", "flour", "arrowroot", "breadcrumbs"]
@dataclass(frozen=True)
class CompensationHint:
ingredient: str
reason: str
element: str
@dataclass(frozen=True)
class SubstitutionSwap:
original_name: str
substitute_name: str
constraint_label: str
fat_delta: float
moisture_delta: float
glutamate_delta: float
protein_delta: float
occurrence_count: int
compensation_hints: list[dict] = field(default_factory=list)
explanation: str = ""
class SubstitutionEngine:
def __init__(self, store: "Store") -> None:
self._store = store
def find_substitutes(
self,
ingredient_name: str,
constraint: str,
) -> list[SubstitutionSwap]:
rows = self._store._fetch_all("""
SELECT substitute_name, constraint_label,
fat_delta, moisture_delta, glutamate_delta, protein_delta,
occurrence_count, compensation_hints
FROM substitution_pairs
WHERE original_name = ? AND constraint_label = ?
ORDER BY occurrence_count DESC
""", (ingredient_name.lower(), constraint))
return [self._row_to_swap(ingredient_name, row) for row in rows]
def _row_to_swap(self, original: str, row: dict) -> SubstitutionSwap:
hints = self._build_hints(row)
explanation = self._build_explanation(original, row, hints)
return SubstitutionSwap(
original_name=original,
substitute_name=row["substitute_name"],
constraint_label=row["constraint_label"],
fat_delta=row.get("fat_delta") or 0.0,
moisture_delta=row.get("moisture_delta") or 0.0,
glutamate_delta=row.get("glutamate_delta") or 0.0,
protein_delta=row.get("protein_delta") or 0.0,
occurrence_count=row.get("occurrence_count") or 1,
compensation_hints=[{"ingredient": h.ingredient, "reason": h.reason, "element": h.element} for h in hints],
explanation=explanation,
)
def _build_hints(self, row: dict) -> list[CompensationHint]:
hints = []
fat_delta = row.get("fat_delta") or 0.0
glutamate_delta = row.get("glutamate_delta") or 0.0
moisture_delta = row.get("moisture_delta") or 0.0
if fat_delta < -_FAT_THRESHOLD:
missing = abs(fat_delta)
sugg = _RICHNESS_COMPENSATORS[0]
hints.append(CompensationHint(
ingredient=sugg,
reason=f"substitute has ~{missing:.0f}g/100g less fat — add {sugg} to restore Richness",
element="Richness",
))
if glutamate_delta < -_GLUTAMATE_THRESHOLD:
sugg = _DEPTH_COMPENSATORS[0]
hints.append(CompensationHint(
ingredient=sugg,
reason=f"substitute is lower in umami — add {sugg} to restore Depth",
element="Depth",
))
if moisture_delta > _MOISTURE_THRESHOLD:
sugg = _MOISTURE_BINDERS[0]
hints.append(CompensationHint(
ingredient=sugg,
reason=f"substitute adds ~{moisture_delta:.0f}g/100g more moisture — add {sugg} to maintain Structure",
element="Structure",
))
return hints
def _build_explanation(
self, original: str, row: dict, hints: list[CompensationHint]
) -> str:
sub = row["substitute_name"]
count = row.get("occurrence_count") or 1
base = f"Replace {original} with {sub} (seen in {count} recipes)."
if hints:
base += " To compensate: " + "; ".join(h.reason for h in hints) + "."
return base

38
app/staples/seitan.yaml Normal file
View file

@ -0,0 +1,38 @@
slug: seitan
name: Seitan (Wheat Meat)
description: High-protein wheat gluten that mimics the texture of meat. Can be made in bulk and stored in multiple formats.
dietary_labels: [vegan, high-protein]
base_ingredients:
- vital wheat gluten
- nutritional yeast
- soy sauce
- garlic powder
- vegetable broth
base_method: simmer
base_time_minutes: 45
yield_formats:
fresh:
elements: [Structure, Depth, Richness]
shelf_days: 5
storage: airtight container, refrigerated in broth
methods: [saute, braise, grill, stir-fry]
texture: chewy, meaty
frozen:
elements: [Structure, Depth]
shelf_days: 90
storage: vacuum-sealed freezer bag
methods: [thaw then any method]
texture: slightly softer after thaw
braised:
elements: [Structure, Depth, Seasoning]
shelf_days: 4
storage: covered in braising liquid, refrigerated
methods: [serve directly, slice for sandwiches]
texture: tender, falling-apart
grilled:
elements: [Structure, Aroma, Texture]
shelf_days: 3
storage: refrigerated, uncovered to maintain crust
methods: [slice cold, reheat in pan]
texture: crisp exterior, chewy interior
compatible_styles: [italian, latin, east_asian, eastern_european]

28
app/staples/tempeh.yaml Normal file
View file

@ -0,0 +1,28 @@
slug: tempeh
name: Tempeh
description: Fermented soybean cake. Dense, nutty, high in protein. Excellent at absorbing marinades.
dietary_labels: [vegan, high-protein, fermented]
base_ingredients:
- tempeh block (store-bought or homemade from soybeans + starter)
base_method: steam then marinate
base_time_minutes: 20
yield_formats:
raw:
elements: [Structure, Depth, Richness]
shelf_days: 7
storage: refrigerated in original packaging or wrapped
methods: [steam, crumble, slice]
texture: dense, firm
marinated:
elements: [Structure, Depth, Seasoning, Aroma]
shelf_days: 5
storage: submerged in marinade, refrigerated
methods: [bake, pan-fry, grill]
texture: chewy, flavor-dense
crumbled:
elements: [Structure, Depth, Texture]
shelf_days: 3
storage: refrigerated, use quickly
methods: [saute as ground meat substitute, add to tacos or pasta]
texture: crumbly, browned bits
compatible_styles: [latin, east_asian, mediterranean]

View file

@ -0,0 +1,34 @@
slug: tofu_firm
name: Firm Tofu
description: Pressed soybean curd. Neutral flavor, excellent at absorbing surrounding flavors. Freeze-thaw cycle creates meatier texture.
dietary_labels: [vegan, high-protein]
base_ingredients:
- firm or extra-firm tofu block
base_method: press (30 min) then prepare
base_time_minutes: 30
yield_formats:
pressed_raw:
elements: [Structure]
shelf_days: 5
storage: submerged in water, refrigerated, change water daily
methods: [cube, slice, crumble]
texture: dense, uniform
freeze_thawed:
elements: [Structure, Texture]
shelf_days: 5
storage: refrigerated after thawing
methods: [squeeze dry, saute, bake]
texture: chewy, porous, absorbs marinades deeply
baked:
elements: [Structure, Texture, Aroma]
shelf_days: 4
storage: refrigerated, uncovered
methods: [add to stir-fry, bowl, salad]
texture: crisp exterior, chewy interior
silken:
elements: [Richness, Structure]
shelf_days: 3
storage: refrigerated, use within days of opening
methods: [blend into sauces, custards, dressings]
texture: silky, smooth
compatible_styles: [east_asian, mediterranean]

View file

@ -0,0 +1,13 @@
style_id: east_asian
name: East Asian
aromatics: [ginger, scallion, sesame, star anise, five spice, sichuan pepper, lemongrass]
depth_sources: [soy sauce, miso, oyster sauce, shiitake, fish sauce, bonito]
brightness_sources: [rice vinegar, mirin, citrus zest, ponzu]
method_bias:
stir_fry: 0.35
steam: 0.25
braise: 0.20
boil: 0.20
structure_forms: [dumpling wrapper, thin noodle, rice, bao]
seasoning_bias: soy sauce
finishing_fat: toasted sesame oil

View file

@ -0,0 +1,13 @@
style_id: eastern_european
name: Eastern European
aromatics: [dill, caraway, marjoram, parsley, horseradish, bay leaf]
depth_sources: [sour cream, smoked meats, bacon, dried mushrooms]
brightness_sources: [sauerkraut brine, apple cider vinegar, sour cream]
method_bias:
braise: 0.35
boil: 0.30
bake: 0.25
roast: 0.10
structure_forms: [dumpling wrapper, bread dough, stuffed cabbage]
seasoning_bias: kosher salt
finishing_fat: butter or lard

13
app/styles/italian.yaml Normal file
View file

@ -0,0 +1,13 @@
style_id: italian
name: Italian
aromatics: [basil, oregano, garlic, onion, fennel, rosemary, thyme, sage, marjoram]
depth_sources: [parmesan, pecorino, anchovies, canned tomato, porcini mushrooms]
brightness_sources: [lemon, white wine, tomato, red wine vinegar]
method_bias:
braise: 0.30
roast: 0.30
saute: 0.25
simmer: 0.15
structure_forms: [pasta, wrapped, layered, risotto]
seasoning_bias: sea salt
finishing_fat: olive oil

13
app/styles/latin.yaml Normal file
View file

@ -0,0 +1,13 @@
style_id: latin
name: Latin
aromatics: [cumin, chili, cilantro, epazote, mexican oregano, ancho, chipotle, smoked paprika]
depth_sources: [dried chilis, smoked peppers, chocolate, achiote]
brightness_sources: [lime, tomatillo, brined jalapeño, orange]
method_bias:
roast: 0.30
braise: 0.30
fry: 0.25
grill: 0.15
structure_forms: [wrapped in masa, pastry, stuffed, bowl]
seasoning_bias: kosher salt
finishing_fat: lard or neutral oil

View file

@ -0,0 +1,13 @@
style_id: mediterranean
name: Mediterranean
aromatics: [oregano, thyme, rosemary, mint, sumac, za'atar, preserved lemon]
depth_sources: [tahini, feta, halloumi, dried olives, harissa]
brightness_sources: [lemon, pomegranate molasses, yogurt, sumac]
method_bias:
roast: 0.35
grill: 0.30
braise: 0.25
saute: 0.10
structure_forms: [flatbread, stuffed vegetables, grain bowl, mezze plate]
seasoning_bias: sea salt
finishing_fat: olive oil

View file

@ -25,6 +25,8 @@ KIWI_FEATURES: dict[str, str] = {
"receipt_upload": "free",
"expiry_alerts": "free",
"export_csv": "free",
"leftover_mode": "free", # Rate-limited at API layer, not tier-gated
"staple_library": "free",
# Paid tier
"receipt_ocr": "paid", # BYOK-unlockable
@ -32,11 +34,11 @@ KIWI_FEATURES: dict[str, str] = {
"expiry_llm_matching": "paid", # BYOK-unlockable
"meal_planning": "paid",
"dietary_profiles": "paid",
"style_picker": "paid",
# Premium tier
"multi_household": "premium",
"background_monitoring": "premium",
"leftover_mode": "premium",
}
@ -47,6 +49,7 @@ def can_use(feature: str, tier: str, has_byok: bool = False) -> bool:
tier,
has_byok=has_byok,
_features=KIWI_FEATURES,
_byok_unlockable=KIWI_BYOK_UNLOCKABLE,
)
@ -54,7 +57,12 @@ def require_feature(feature: str, tier: str, has_byok: bool = False) -> None:
"""Raise ValueError if the tier cannot access the feature."""
if not can_use(feature, tier, has_byok):
from circuitforge_core.tiers.tiers import tier_label
needed = tier_label(feature, has_byok=has_byok, _features=KIWI_FEATURES)
needed = tier_label(
feature,
has_byok=has_byok,
_features=KIWI_FEATURES,
_byok_unlockable=KIWI_BYOK_UNLOCKABLE,
)
raise ValueError(
f"Feature '{feature}' requires {needed} tier. "
f"Current tier: {tier}."

View file

@ -16,3 +16,11 @@ dependencies:
- httpx>=0.27
- pydantic>=2.5
- PyJWT>=2.8
- datasets
- huggingface_hub
- transformers
- sentence-transformers
- torch
- pyyaml
- pandas
- pyarrow

0
scripts/__init__.py Normal file
View file

View file

View file

@ -0,0 +1,79 @@
"""
Import FlavorGraph compound->ingredient map into flavor_molecules table.
FlavorGraph GitHub: https://github.com/lamypark/FlavorGraph
Download: git clone https://github.com/lamypark/FlavorGraph /tmp/flavorgraph
Usage:
conda run -n job-seeker python scripts/pipeline/build_flavorgraph_index.py \
--db /path/to/kiwi.db \
--graph-json /tmp/flavorgraph/data/graph.json
"""
from __future__ import annotations
import argparse
import json
import sqlite3
from collections import defaultdict
from pathlib import Path
def parse_ingredient_nodes(graph: dict) -> dict[str, list[str]]:
"""Return {ingredient_name: [compound_id, ...]} from a FlavorGraph JSON."""
ingredient_compounds: dict[str, list[str]] = defaultdict(list)
ingredient_ids: dict[str, str] = {} # node_id -> ingredient_name
for node in graph.get("nodes", []):
if node.get("type") == "ingredient":
ingredient_ids[node["id"]] = node["name"].lower()
for link in graph.get("links", []):
src, tgt = link.get("source", ""), link.get("target", "")
if src in ingredient_ids:
ingredient_compounds[ingredient_ids[src]].append(tgt)
if tgt in ingredient_ids:
ingredient_compounds[ingredient_ids[tgt]].append(src)
return dict(ingredient_compounds)
def build(db_path: Path, graph_json_path: Path) -> None:
graph = json.loads(graph_json_path.read_text())
ingredient_map = parse_ingredient_nodes(graph)
compound_ingredients: dict[str, list[str]] = defaultdict(list)
compound_names: dict[str, str] = {}
for node in graph.get("nodes", []):
if node.get("type") == "compound":
compound_names[node["id"]] = node["name"]
for ingredient, compounds in ingredient_map.items():
for cid in compounds:
compound_ingredients[cid].append(ingredient)
conn = sqlite3.connect(db_path)
for ingredient, compounds in ingredient_map.items():
conn.execute("""
UPDATE ingredient_profiles
SET flavor_molecule_ids = ?
WHERE name = ?
""", (json.dumps(compounds), ingredient))
for cid, ingredients in compound_ingredients.items():
conn.execute("""
INSERT OR IGNORE INTO flavor_molecules (compound_id, compound_name, ingredient_names)
VALUES (?, ?, ?)
""", (cid, compound_names.get(cid, cid), json.dumps(ingredients)))
conn.commit()
conn.close()
print(f"Indexed {len(ingredient_map)} ingredients, {len(compound_ingredients)} compounds")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--db", required=True, type=Path)
parser.add_argument("--graph-json", required=True, type=Path)
args = parser.parse_args()
build(args.db, args.graph_json)

View file

@ -0,0 +1,134 @@
"""
Build ingredient_profiles table from USDA FDC (Food Data Central) data.
Usage:
conda run -n job-seeker python scripts/pipeline/build_ingredient_index.py \
--db /path/to/kiwi.db \
--usda-fdc data/usda_fdc_cleaned.parquet \
--usda-branded data/usda_branded.parquet
"""
from __future__ import annotations
import argparse
import json
import re
import sqlite3
from pathlib import Path
import pandas as pd
# ── Element derivation rules (threshold-based) ────────────────────────────
_ELEMENT_RULES: list[tuple[str, callable]] = [
("Richness", lambda r: r.get("fat_pct", 0) > 5.0),
("Seasoning", lambda r: r.get("sodium_mg_per_100g", 0) > 200),
("Depth", lambda r: r.get("glutamate_mg", 0) > 1.0),
("Structure", lambda r: r.get("starch_pct", 0) > 10.0 or r.get("binding_score", 0) >= 2),
("Texture", lambda r: r.get("water_activity", 1.0) < 0.6), # low water = likely crunchy/dry
]
_ACID_KEYWORDS = ["vinegar", "lemon", "lime", "citric", "tartaric", "kombucha", "kefir",
"yogurt", "buttermilk", "wine", "tomato"]
_AROMA_KEYWORDS = ["garlic", "onion", "herb", "spice", "basil", "oregano", "cumin",
"ginger", "cinnamon", "pepper", "chili", "paprika", "thyme", "rosemary",
"cilantro", "parsley", "dill", "fennel", "cardamom", "turmeric"]
_FERMENTED_KEYWORDS = ["miso", "soy sauce", "kimchi", "sauerkraut", "kefir", "yogurt",
"kombucha", "tempeh", "natto", "vinegar", "nutritional yeast"]
def normalize_name(raw: str) -> str:
"""Lowercase, strip parentheticals and trailing descriptors."""
name = raw.lower().strip()
name = re.sub(r"\(.*?\)", "", name) # remove (85% lean)
name = re.sub(r",.*$", "", name) # remove ,shredded
name = re.sub(r"\s+", " ", name).strip()
return name
def derive_elements(row: dict) -> list[str]:
elements = [elem for elem, check in _ELEMENT_RULES if check(row)]
name = row.get("name", "").lower()
if any(k in name for k in _ACID_KEYWORDS):
elements.append("Brightness")
if any(k in name for k in _AROMA_KEYWORDS):
elements.append("Aroma")
return list(dict.fromkeys(elements)) # dedup, preserve order
def derive_binding_score(row: dict) -> int:
protein = row.get("protein_pct", 0)
starch = row.get("starch_pct", 0)
if starch > 50 or (protein > 10 and starch > 20):
return 3
if starch > 20 or protein > 12:
return 2
if starch > 5 or protein > 6:
return 1
return 0
def build(db_path: Path, usda_fdc_path: Path, usda_branded_path: Path) -> None:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys=ON")
df_fdc = pd.read_parquet(usda_fdc_path)
df_branded = pd.read_parquet(usda_branded_path)
# Rename columns to unified schema
fdc_col_map = {
"food_item": "name",
"Total lipid (fat)": "fat_pct",
"Protein": "protein_pct",
"Carbohydrate, by difference": "carb_pct",
"Fiber, total dietary": "fiber_pct",
"Sodium, Na": "sodium_mg_per_100g",
"Water": "moisture_pct",
}
df = df_fdc.rename(columns={k: v for k, v in fdc_col_map.items() if k in df_fdc.columns})
inserted = 0
for _, row in df.iterrows():
name = normalize_name(str(row.get("name", "")))
if not name or len(name) < 2:
continue
r = {
"name": name,
"fat_pct": float(row.get("fat_pct") or 0),
"protein_pct": float(row.get("protein_pct") or 0),
"moisture_pct": float(row.get("moisture_pct") or 0),
"sodium_mg_per_100g": float(row.get("sodium_mg_per_100g") or 0),
"starch_pct": 0.0,
}
r["binding_score"] = derive_binding_score(r)
r["elements"] = derive_elements(r)
r["is_fermented"] = int(any(k in name for k in _FERMENTED_KEYWORDS))
try:
conn.execute("""
INSERT OR IGNORE INTO ingredient_profiles
(name, elements, fat_pct, fat_saturated_pct, moisture_pct,
protein_pct, starch_pct, binding_score, sodium_mg_per_100g,
is_fermented, source)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""", (
r["name"], json.dumps(r["elements"]),
r["fat_pct"], 0.0, r["moisture_pct"],
r["protein_pct"], r["starch_pct"], r["binding_score"],
r["sodium_mg_per_100g"], r["is_fermented"], "usda_fdc",
))
inserted += conn.execute("SELECT changes()").fetchone()[0]
except Exception:
continue
conn.commit()
conn.close()
print(f"Inserted {inserted} ingredient profiles from USDA FDC")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--db", required=True, type=Path)
parser.add_argument("--usda-fdc", required=True, type=Path)
parser.add_argument("--usda-branded", required=True, type=Path)
args = parser.parse_args()
build(args.db, args.usda_fdc, args.usda_branded)

View file

@ -0,0 +1,147 @@
"""
Import food.com recipe corpus into recipes table.
Usage:
conda run -n job-seeker python scripts/pipeline/build_recipe_index.py \
--db /path/to/kiwi.db \
--recipes data/recipes_foodcom.parquet \
--batch-size 10000
"""
from __future__ import annotations
import argparse
import json
import re
import sqlite3
from pathlib import Path
import pandas as pd
_MEASURE_PATTERN = re.compile(
r"^\d[\d\s/\u00bc\u00bd\u00be\u2153\u2154]*\s*(cup|tbsp|tsp|oz|lb|g|kg|ml|l|clove|slice|piece|can|pkg|package|bunch|head|stalk|sprig|pinch|dash|to taste|as needed)s?\b",
re.IGNORECASE,
)
_LEAD_NUMBER = re.compile(r"^\d[\d\s/\u00bc\u00bd\u00be\u2153\u2154]*\s*")
_TRAILING_QUALIFIER = re.compile(
r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$",
re.IGNORECASE,
)
def extract_ingredient_names(raw_list: list[str]) -> list[str]:
"""Strip quantities and units from ingredient strings -> normalized names."""
names = []
for raw in raw_list:
s = raw.lower().strip()
s = _MEASURE_PATTERN.sub("", s)
s = _LEAD_NUMBER.sub("", s)
s = re.sub(r"\(.*?\)", "", s)
s = re.sub(r",.*$", "", s)
s = _TRAILING_QUALIFIER.sub("", s)
s = s.strip(" -.,")
if s and len(s) > 1:
names.append(s)
return names
def compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
counts: dict[str, int] = {}
for p in profiles:
for elem in p.get("elements", []):
counts[elem] = counts.get(elem, 0) + 1
if not profiles:
return {}
return {e: round(c / len(profiles), 3) for e, c in counts.items()}
def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
conn = sqlite3.connect(db_path)
try:
conn.execute("PRAGMA journal_mode=WAL")
# Pre-load ingredient element profiles to avoid N+1 queries
profile_index: dict[str, list[str]] = {}
for row in conn.execute("SELECT name, elements FROM ingredient_profiles"):
try:
profile_index[row[0]] = json.loads(row[1])
except Exception:
pass
df = pd.read_parquet(recipes_path)
inserted = 0
batch = []
for _, row in df.iterrows():
raw_ingredients = row.get("RecipeIngredientParts", [])
if isinstance(raw_ingredients, str):
try:
raw_ingredients = json.loads(raw_ingredients)
except Exception:
raw_ingredients = [raw_ingredients]
raw_ingredients = [str(i) for i in (raw_ingredients or [])]
ingredient_names = extract_ingredient_names(raw_ingredients)
profiles = []
for name in ingredient_names:
if name in profile_index:
profiles.append({"elements": profile_index[name]})
coverage = compute_element_coverage(profiles)
directions = row.get("RecipeInstructions", [])
if isinstance(directions, str):
try:
directions = json.loads(directions)
except Exception:
directions = [directions]
batch.append((
str(row.get("RecipeId", "")),
str(row.get("Name", ""))[:500],
json.dumps(raw_ingredients),
json.dumps(ingredient_names),
json.dumps([str(d) for d in (directions or [])]),
str(row.get("RecipeCategory", "") or ""),
json.dumps(list(row.get("Keywords", []) or [])),
float(row.get("Calories") or 0) or None,
float(row.get("FatContent") or 0) or None,
float(row.get("ProteinContent") or 0) or None,
float(row.get("SodiumContent") or 0) or None,
json.dumps(coverage),
))
if len(batch) >= batch_size:
before = conn.total_changes
conn.executemany("""
INSERT OR IGNORE INTO recipes
(external_id, title, ingredients, ingredient_names, directions,
category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""", batch)
conn.commit()
inserted += conn.total_changes - before
print(f" {inserted} recipes inserted...")
batch = []
if batch:
before = conn.total_changes
conn.executemany("""
INSERT OR IGNORE INTO recipes
(external_id, title, ingredients, ingredient_names, directions,
category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""", batch)
conn.commit()
inserted += conn.total_changes - before
conn.commit()
finally:
conn.close()
print(f"Total: {inserted} recipes inserted")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--db", required=True, type=Path)
parser.add_argument("--recipes", required=True, type=Path)
parser.add_argument("--batch-size", type=int, default=10000)
args = parser.parse_args()
build(args.db, args.recipes, args.batch_size)

View file

@ -0,0 +1,108 @@
"""
Derive substitution pairs by diffing lishuyang/recipepairs.
GPL-3.0 source -- derived annotations only, raw pairs not shipped.
Usage:
conda run -n job-seeker python scripts/pipeline/derive_substitutions.py \
--db /path/to/kiwi.db \
--recipepairs data/recipepairs.parquet
"""
from __future__ import annotations
import argparse
import json
import sqlite3
from collections import defaultdict
from pathlib import Path
import pandas as pd
from scripts.pipeline.build_recipe_index import extract_ingredient_names
CONSTRAINT_COLS = ["vegan", "vegetarian", "dairy_free", "low_calorie",
"low_carb", "low_fat", "low_sodium", "gluten_free"]
def diff_ingredients(base: list[str], target: list[str]) -> tuple[list[str], list[str]]:
base_set = set(base)
target_set = set(target)
removed = list(base_set - target_set)
added = list(target_set - base_set)
return removed, added
def build(db_path: Path, recipepairs_path: Path) -> None:
conn = sqlite3.connect(db_path)
try:
print("Loading recipe ingredient index...")
recipe_ingredients: dict[str, list[str]] = {}
for row in conn.execute("SELECT external_id, ingredient_names FROM recipes"):
recipe_ingredients[str(row[0])] = json.loads(row[1])
df = pd.read_parquet(recipepairs_path)
pair_counts: dict[tuple, dict] = defaultdict(lambda: {"count": 0})
print("Diffing recipe pairs...")
for _, row in df.iterrows():
base_id = str(row.get("base", ""))
target_id = str(row.get("target", ""))
base_ings = recipe_ingredients.get(base_id, [])
target_ings = recipe_ingredients.get(target_id, [])
if not base_ings or not target_ings:
continue
removed, added = diff_ingredients(base_ings, target_ings)
if len(removed) != 1 or len(added) != 1:
continue
original = removed[0]
substitute = added[0]
constraints = [c for c in CONSTRAINT_COLS if row.get(c, 0)]
for constraint in constraints:
key = (original, substitute, constraint)
pair_counts[key]["count"] += 1
def get_profile(name: str) -> dict:
row = conn.execute(
"SELECT fat_pct, moisture_pct, glutamate_mg, protein_pct "
"FROM ingredient_profiles WHERE name = ?", (name,)
).fetchone()
if row:
return {"fat": row[0] or 0, "moisture": row[1] or 0,
"glutamate": row[2] or 0, "protein": row[3] or 0}
return {"fat": 0, "moisture": 0, "glutamate": 0, "protein": 0}
print("Writing substitution pairs...")
inserted = 0
for (original, substitute, constraint), data in pair_counts.items():
if data["count"] < 3:
continue
p_orig = get_profile(original)
p_sub = get_profile(substitute)
conn.execute("""
INSERT OR REPLACE INTO substitution_pairs
(original_name, substitute_name, constraint_label,
fat_delta, moisture_delta, glutamate_delta, protein_delta,
occurrence_count, source)
VALUES (?,?,?,?,?,?,?,?,?)
""", (
original, substitute, constraint,
round(p_sub["fat"] - p_orig["fat"], 2),
round(p_sub["moisture"] - p_orig["moisture"], 2),
round(p_sub["glutamate"] - p_orig["glutamate"], 2),
round(p_sub["protein"] - p_orig["protein"], 2),
data["count"], "derived",
))
inserted += 1
conn.commit()
finally:
conn.close()
print(f"Inserted {inserted} substitution pairs (min 3 occurrences)")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--db", required=True, type=Path)
parser.add_argument("--recipepairs", required=True, type=Path)
args = parser.parse_args()
build(args.db, args.recipepairs)

View file

@ -0,0 +1,44 @@
"""
Download recipe engine datasets from HuggingFace.
Usage:
conda run -n job-seeker python scripts/pipeline/download_datasets.py --data-dir /path/to/data
Downloads:
- AkashPS11/recipes_data_food.com (MIT) data/recipes_foodcom.parquet
- omid5/usda-fdc-foods-cleaned (CC0) data/usda_fdc_cleaned.parquet
- jacktol/usda-branded-food-data (MIT) data/usda_branded.parquet
- lishuyang/recipepairs (GPL-3.0 ) data/recipepairs.parquet [derive only, don't ship]
"""
from __future__ import annotations
import argparse
from pathlib import Path
from datasets import load_dataset
DATASETS = [
("AkashPS11/recipes_data_food.com", "train", "recipes_foodcom.parquet"),
("omid5/usda-fdc-foods-cleaned", "train", "usda_fdc_cleaned.parquet"),
("jacktol/usda-branded-food-data", "train", "usda_branded.parquet"),
("lishuyang/recipepairs", "train", "recipepairs.parquet"),
]
def download_all(data_dir: Path) -> None:
data_dir.mkdir(parents=True, exist_ok=True)
for hf_path, split, filename in DATASETS:
out = data_dir / filename
if out.exists():
print(f" skip {filename} (already exists)")
continue
print(f" downloading {hf_path} ...")
ds = load_dataset(hf_path, split=split)
ds.to_parquet(str(out))
print(f" saved → {out}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-dir", required=True, type=Path)
args = parser.parse_args()
download_all(args.data_dir)

0
tests/__init__.py Normal file
View file

0
tests/api/__init__.py Normal file
View file

78
tests/api/test_recipes.py Normal file
View file

@ -0,0 +1,78 @@
import pytest
from fastapi.testclient import TestClient
from unittest.mock import MagicMock
from app.main import app
from app.cloud_session import get_session
from app.db.session import get_store
client = TestClient(app)
def _make_session(tier: str = "free", has_byok: bool = False) -> MagicMock:
mock = MagicMock()
mock.tier = tier
mock.has_byok = has_byok
return mock
def _make_store() -> MagicMock:
mock = MagicMock()
mock.search_recipes_by_ingredients.return_value = [
{
"id": 1,
"title": "Butter Pasta",
"ingredient_names": ["butter", "pasta"],
"element_coverage": {"Richness": 0.5},
"match_count": 2,
"directions": ["mix and heat"],
}
]
mock.check_and_increment_rate_limit.return_value = (True, 1)
return mock
@pytest.fixture(autouse=True)
def override_deps():
session_mock = _make_session()
store_mock = _make_store()
app.dependency_overrides[get_session] = lambda: session_mock
app.dependency_overrides[get_store] = lambda: store_mock
yield session_mock, store_mock
app.dependency_overrides.clear()
def test_suggest_returns_200():
resp = client.post("/api/v1/recipes/suggest", json={
"pantry_items": ["butter", "pasta"],
"level": 1,
"constraints": [],
})
assert resp.status_code == 200
data = resp.json()
assert "suggestions" in data
assert "element_gaps" in data
assert "grocery_list" in data
assert "grocery_links" in data
def test_suggest_level4_requires_wildcard_confirmed():
resp = client.post("/api/v1/recipes/suggest", json={
"pantry_items": ["butter"],
"level": 4,
"constraints": [],
"wildcard_confirmed": False,
})
assert resp.status_code == 400
def test_suggest_level3_requires_paid_tier(override_deps):
session_mock, _ = override_deps
session_mock.tier = "free"
session_mock.has_byok = False
resp = client.post("/api/v1/recipes/suggest", json={
"pantry_items": ["butter"],
"level": 3,
"constraints": [],
})
assert resp.status_code == 403

110
tests/api/test_settings.py Normal file
View file

@ -0,0 +1,110 @@
"""Tests for user settings endpoints."""
from __future__ import annotations
import json
from unittest.mock import MagicMock
import pytest
from fastapi.testclient import TestClient
from app.cloud_session import get_session
from app.db.session import get_store
from app.main import app
from app.models.schemas.recipe import RecipeRequest
from app.services.recipe.recipe_engine import RecipeEngine
client = TestClient(app)
def _make_session(tier: str = "free", has_byok: bool = False) -> MagicMock:
mock = MagicMock()
mock.tier = tier
mock.has_byok = has_byok
return mock
def _make_store() -> MagicMock:
mock = MagicMock()
mock.get_setting.return_value = None
mock.set_setting.return_value = None
mock.search_recipes_by_ingredients.return_value = []
mock.check_and_increment_rate_limit.return_value = (True, 1)
return mock
@pytest.fixture()
def tmp_store() -> MagicMock:
session_mock = _make_session()
store_mock = _make_store()
app.dependency_overrides[get_session] = lambda: session_mock
app.dependency_overrides[get_store] = lambda: store_mock
yield store_mock
app.dependency_overrides.clear()
def test_set_and_get_cooking_equipment(tmp_store: MagicMock) -> None:
"""PUT then GET round-trips the cooking_equipment value."""
equipment_json = '["oven", "stovetop"]'
# PUT stores the value
put_resp = client.put(
"/api/v1/settings/cooking_equipment",
json={"value": equipment_json},
)
assert put_resp.status_code == 200
assert put_resp.json()["key"] == "cooking_equipment"
assert put_resp.json()["value"] == equipment_json
tmp_store.set_setting.assert_called_once_with("cooking_equipment", equipment_json)
# GET returns the stored value
tmp_store.get_setting.return_value = equipment_json
get_resp = client.get("/api/v1/settings/cooking_equipment")
assert get_resp.status_code == 200
assert get_resp.json()["value"] == equipment_json
def test_get_missing_setting_returns_404(tmp_store: MagicMock) -> None:
"""GET an allowed key that was never set returns 404."""
tmp_store.get_setting.return_value = None
resp = client.get("/api/v1/settings/cooking_equipment")
assert resp.status_code == 404
def test_hard_day_mode_uses_equipment_setting(tmp_store: MagicMock) -> None:
"""RecipeEngine.suggest() respects cooking_equipment from store when hard_day_mode=True."""
equipment_json = '["microwave"]'
tmp_store.get_setting.return_value = equipment_json
engine = RecipeEngine(store=tmp_store)
req = RecipeRequest(
pantry_items=["rice", "water"],
level=1,
constraints=[],
hard_day_mode=True,
)
result = engine.suggest(req)
# Engine should have read the equipment setting
tmp_store.get_setting.assert_called_with("cooking_equipment")
# Result is a valid RecipeResult (no crash)
assert result is not None
assert hasattr(result, "suggestions")
def test_put_unknown_key_returns_422(tmp_store: MagicMock) -> None:
"""PUT to an unknown settings key returns 422."""
resp = client.put(
"/api/v1/settings/nonexistent_key",
json={"value": "something"},
)
assert resp.status_code == 422
def test_put_null_value_returns_422(tmp_store: MagicMock) -> None:
"""PUT with a null value returns 422 (Pydantic validation)."""
resp = client.put(
"/api/v1/settings/cooking_equipment",
json={"value": None},
)
assert resp.status_code == 422

0
tests/db/__init__.py Normal file
View file

View file

@ -0,0 +1,44 @@
import json, pytest
from tests.services.recipe.test_element_classifier import store_with_profiles
@pytest.fixture
def store_with_recipes(store_with_profiles):
store_with_profiles.conn.executemany("""
INSERT INTO recipes (external_id, title, ingredients, ingredient_names,
directions, category, keywords, element_coverage)
VALUES (?,?,?,?,?,?,?,?)
""", [
("1", "Butter Pasta", '["butter","pasta","parmesan"]',
'["butter","pasta","parmesan"]', '["boil pasta","toss with butter"]',
"Italian", '["quick","pasta"]',
'{"Richness":0.5,"Depth":0.3,"Structure":0.2}'),
("2", "Lentil Soup", '["lentils","carrots","onion","broth"]',
'["lentils","carrots","onion","broth"]', '["simmer all"]',
"Soup", '["vegan","hearty"]',
'{"Depth":0.4,"Seasoning":0.3}'),
])
store_with_profiles.conn.commit()
return store_with_profiles
def test_search_recipes_by_ingredient_names(store_with_recipes):
results = store_with_recipes.search_recipes_by_ingredients(["butter", "parmesan"])
assert len(results) >= 1
assert any(r["title"] == "Butter Pasta" for r in results)
def test_search_recipes_respects_limit(store_with_recipes):
results = store_with_recipes.search_recipes_by_ingredients(["butter"], limit=1)
assert len(results) <= 1
def test_check_rate_limit_first_call(store_with_recipes):
allowed, count = store_with_recipes.check_and_increment_rate_limit("leftover_mode", daily_max=5)
assert allowed is True
assert count == 1
def test_check_rate_limit_exceeded(store_with_recipes):
for _ in range(5):
store_with_recipes.check_and_increment_rate_limit("leftover_mode", daily_max=5)
allowed, count = store_with_recipes.check_and_increment_rate_limit("leftover_mode", daily_max=5)
assert allowed is False
assert count == 5

View file

View file

@ -0,0 +1,18 @@
def test_parse_flavorgraph_node():
from scripts.pipeline.build_flavorgraph_index import parse_ingredient_nodes
sample = {
"nodes": [
{"id": "I_beef", "type": "ingredient", "name": "beef"},
{"id": "C_pyrazine", "type": "compound", "name": "pyrazine"},
{"id": "I_mushroom", "type": "ingredient", "name": "mushroom"},
],
"links": [
{"source": "I_beef", "target": "C_pyrazine"},
{"source": "I_mushroom","target": "C_pyrazine"},
]
}
result = parse_ingredient_nodes(sample)
assert "beef" in result
assert "C_pyrazine" in result["beef"]
assert "mushroom" in result
assert "C_pyrazine" in result["mushroom"]

View file

@ -0,0 +1,23 @@
import pytest
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parents[2]))
def test_normalize_ingredient_name():
from scripts.pipeline.build_ingredient_index import normalize_name
assert normalize_name("Ground Beef (85% lean)") == "ground beef"
assert normalize_name(" Olive Oil ") == "olive oil"
assert normalize_name("Cheddar Cheese, shredded") == "cheddar cheese"
def test_derive_elements_from_usda_row():
from scripts.pipeline.build_ingredient_index import derive_elements
row = {"fat_pct": 20.0, "protein_pct": 17.0, "moisture_pct": 60.0,
"sodium_mg_per_100g": 65.0, "glutamate_mg": 2.8, "starch_pct": 0.0}
elements = derive_elements(row)
assert "Richness" in elements # high fat
assert "Depth" in elements # notable glutamate
def test_derive_binding_score():
from scripts.pipeline.build_ingredient_index import derive_binding_score
assert derive_binding_score({"protein_pct": 12.0, "starch_pct": 68.0}) == 3 # flour
assert derive_binding_score({"protein_pct": 1.0, "starch_pct": 0.5}) == 0 # water

View file

@ -0,0 +1,19 @@
def test_extract_ingredient_names():
from scripts.pipeline.build_recipe_index import extract_ingredient_names
raw = ["2 cups all-purpose flour", "1 lb ground beef (85/15)", "salt to taste"]
names = extract_ingredient_names(raw)
assert "flour" in names or "all-purpose flour" in names
assert "ground beef" in names
assert "salt" in names
def test_compute_element_coverage():
from scripts.pipeline.build_recipe_index import compute_element_coverage
profiles = [
{"elements": ["Richness", "Depth"]},
{"elements": ["Brightness"]},
{"elements": ["Seasoning"]},
]
coverage = compute_element_coverage(profiles)
assert coverage["Richness"] > 0
assert coverage["Brightness"] > 0
assert coverage.get("Aroma", 0) == 0

View file

@ -0,0 +1,10 @@
def test_diff_ingredient_lists():
from scripts.pipeline.derive_substitutions import diff_ingredients
base = ["ground beef", "chicken broth", "olive oil", "onion"]
target = ["lentils", "vegetable broth", "olive oil", "onion"]
removed, added = diff_ingredients(base, target)
assert "ground beef" in removed
assert "chicken broth" in removed
assert "lentils" in added
assert "vegetable broth" in added
assert "olive oil" not in removed # unchanged

View file

View file

View file

@ -0,0 +1,69 @@
import pytest
import sqlite3
import json
import tempfile
from pathlib import Path
from app.db.store import Store
@pytest.fixture
def store_with_profiles(tmp_path):
db_path = tmp_path / "test.db"
store = Store(db_path)
# Seed ingredient_profiles
store.conn.execute("""
INSERT INTO ingredient_profiles
(name, elements, fat_pct, moisture_pct, glutamate_mg, binding_score,
sodium_mg_per_100g, is_fermented, texture_profile)
VALUES (?,?,?,?,?,?,?,?,?)
""", ("butter", json.dumps(["Richness"]), 81.0, 16.0, 0.1, 0, 11.0, 0, "creamy"))
store.conn.execute("""
INSERT INTO ingredient_profiles
(name, elements, fat_pct, moisture_pct, glutamate_mg, binding_score,
sodium_mg_per_100g, is_fermented, texture_profile)
VALUES (?,?,?,?,?,?,?,?,?)
""", ("parmesan", json.dumps(["Depth", "Seasoning"]), 29.0, 29.0, 1.2, 1, 1600.0, 0, "neutral"))
store.conn.commit()
return store
def test_classify_known_ingredient(store_with_profiles):
from app.services.recipe.element_classifier import ElementClassifier
clf = ElementClassifier(store_with_profiles)
profile = clf.classify("butter")
assert "Richness" in profile.elements
assert profile.fat_pct == pytest.approx(81.0)
assert profile.name == "butter"
assert profile.source == "db"
def test_classify_unknown_ingredient_uses_heuristic(store_with_profiles):
from app.services.recipe.element_classifier import ElementClassifier
clf = ElementClassifier(store_with_profiles)
profile = clf.classify("ghost pepper hot sauce")
# Heuristic should detect acid / aroma
assert "Aroma" in profile.elements # "pepper" in name matches Aroma heuristic
assert profile.name == "ghost pepper hot sauce"
def test_classify_batch(store_with_profiles):
from app.services.recipe.element_classifier import ElementClassifier
clf = ElementClassifier(store_with_profiles)
results = clf.classify_batch(["butter", "parmesan", "unknown herb"])
assert len(results) == 3
assert results[0].name == "butter"
assert results[1].name == "parmesan"
def test_identify_gaps(store_with_profiles):
from app.services.recipe.element_classifier import ElementClassifier
clf = ElementClassifier(store_with_profiles)
profiles = [
clf.classify("butter"),
clf.classify("parmesan"),
]
gaps = clf.identify_gaps(profiles)
# We have Richness + Depth + Seasoning; should flag Brightness, Aroma, Structure, Texture
assert "Brightness" in gaps
assert "Richness" not in gaps

View file

@ -0,0 +1,141 @@
"""Tests for LLMRecipeGenerator — prompt builders and allergy filtering."""
from __future__ import annotations
import pytest
from app.models.schemas.recipe import RecipeRequest
from app.services.recipe.element_classifier import IngredientProfile
def _make_store():
"""Create a minimal in-memory Store."""
from app.db.store import Store
import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
store = Store.__new__(Store)
store.conn = conn
return store
def test_build_level3_prompt_contains_element_scaffold():
"""Level 3 prompt includes element coverage, pantry items, and constraints."""
from app.services.recipe.llm_recipe import LLMRecipeGenerator
store = _make_store()
gen = LLMRecipeGenerator(store)
req = RecipeRequest(
pantry_items=["butter", "mushrooms"],
level=3,
constraints=["vegetarian"],
)
profiles = [
IngredientProfile(name="butter", elements=["Richness"]),
IngredientProfile(name="mushrooms", elements=["Depth"]),
]
gaps = ["Brightness", "Aroma"]
prompt = gen.build_level3_prompt(req, profiles, gaps)
assert "Richness" in prompt
assert "Depth" in prompt
assert "Brightness" in prompt
assert "butter" in prompt
assert "vegetarian" in prompt
def test_build_level4_prompt_contains_pantry_and_constraints():
"""Level 4 prompt is concise and includes key context."""
from app.services.recipe.llm_recipe import LLMRecipeGenerator
store = _make_store()
gen = LLMRecipeGenerator(store)
req = RecipeRequest(
pantry_items=["pasta", "eggs", "mystery ingredient"],
level=4,
constraints=["no gluten"],
allergies=["gluten"],
wildcard_confirmed=True,
)
prompt = gen.build_level4_prompt(req)
assert "mystery" in prompt.lower()
assert "gluten" in prompt.lower()
assert len(prompt) < 1500
def test_allergy_items_excluded_from_prompt():
"""Allergy items are listed as forbidden AND filtered from pantry shown to LLM."""
from app.services.recipe.llm_recipe import LLMRecipeGenerator
store = _make_store()
gen = LLMRecipeGenerator(store)
req = RecipeRequest(
pantry_items=["olive oil", "peanuts", "garlic"],
level=3,
constraints=[],
allergies=["peanuts"],
)
profiles = [
IngredientProfile(name="olive oil", elements=["Richness"]),
IngredientProfile(name="peanuts", elements=["Texture"]),
IngredientProfile(name="garlic", elements=["Aroma"]),
]
gaps: list[str] = []
prompt = gen.build_level3_prompt(req, profiles, gaps)
# Check peanuts are in the exclusion section but NOT in the pantry section
lines = prompt.split("\n")
pantry_line = next((l for l in lines if l.startswith("Pantry")), "")
exclusion_line = next(
(l for l in lines if "must not" in l.lower()),
"",
)
assert "peanuts" not in pantry_line.lower()
assert "peanuts" in exclusion_line.lower()
assert "olive oil" in prompt.lower()
def test_generate_returns_result_when_llm_responds(monkeypatch):
"""generate() returns RecipeResult with title when LLM returns a valid response."""
from app.services.recipe.llm_recipe import LLMRecipeGenerator
from app.models.schemas.recipe import RecipeResult
store = _make_store()
gen = LLMRecipeGenerator(store)
canned_response = (
"Title: Mushroom Butter Pasta\n"
"Ingredients: butter, mushrooms, pasta\n"
"Directions: Cook pasta. Sauté mushrooms in butter. Combine.\n"
"Notes: Add parmesan to taste.\n"
)
monkeypatch.setattr(gen, "_call_llm", lambda prompt: canned_response)
req = RecipeRequest(
pantry_items=["butter", "mushrooms", "pasta"],
level=3,
constraints=["vegetarian"],
)
profiles = [
IngredientProfile(name="butter", elements=["Richness"]),
IngredientProfile(name="mushrooms", elements=["Depth"]),
]
gaps = ["Brightness"]
result = gen.generate(req, profiles, gaps)
assert isinstance(result, RecipeResult)
assert len(result.suggestions) == 1
suggestion = result.suggestions[0]
assert suggestion.title == "Mushroom Butter Pasta"
assert "butter" in suggestion.missing_ingredients
assert len(suggestion.directions) > 0
assert "parmesan" in suggestion.notes.lower()
assert result.element_gaps == ["Brightness"]

View file

@ -0,0 +1,121 @@
import pytest, json
from tests.services.recipe.test_element_classifier import store_with_profiles
from tests.db.test_store_recipes import store_with_recipes
def test_level1_returns_ranked_suggestions(store_with_recipes):
from app.services.recipe.recipe_engine import RecipeEngine, RecipeRequest
engine = RecipeEngine(store_with_recipes)
req = RecipeRequest(
pantry_items=["butter", "parmesan"],
level=1,
constraints=[],
)
result = engine.suggest(req)
assert len(result.suggestions) > 0
assert result.suggestions[0].title == "Butter Pasta"
def test_level1_expiry_first_requires_rate_limit_free(store_with_recipes):
from app.services.recipe.recipe_engine import RecipeEngine, RecipeRequest
engine = RecipeEngine(store_with_recipes)
for _ in range(5):
req = RecipeRequest(
pantry_items=["butter"],
level=1,
constraints=[],
expiry_first=True,
tier="free",
)
result = engine.suggest(req)
assert result.rate_limited is False
req = RecipeRequest(
pantry_items=["butter"],
level=1,
constraints=[],
expiry_first=True,
tier="free",
)
result = engine.suggest(req)
assert result.rate_limited is True
def test_level2_returns_swap_candidates(store_with_recipes):
from app.services.recipe.recipe_engine import RecipeEngine, RecipeRequest
store_with_recipes.conn.execute("""
INSERT INTO substitution_pairs
(original_name, substitute_name, constraint_label, fat_delta, occurrence_count)
VALUES (?,?,?,?,?)
""", ("butter", "coconut oil", "vegan", -1.0, 12))
store_with_recipes.conn.commit()
engine = RecipeEngine(store_with_recipes)
req = RecipeRequest(
pantry_items=["butter", "parmesan"],
level=2,
constraints=["vegan"],
)
result = engine.suggest(req)
swapped = [s for s in result.suggestions if s.swap_candidates]
assert len(swapped) > 0
def test_element_gaps_reported(store_with_recipes):
from app.services.recipe.recipe_engine import RecipeEngine, RecipeRequest
engine = RecipeEngine(store_with_recipes)
req = RecipeRequest(pantry_items=["butter"], level=1, constraints=[])
result = engine.suggest(req)
assert isinstance(result.element_gaps, list)
def test_grocery_list_max_missing(store_with_recipes):
from app.services.recipe.recipe_engine import RecipeEngine, RecipeRequest
engine = RecipeEngine(store_with_recipes)
# Butter Pasta needs butter, pasta, parmesan. We have only butter → missing 2
req = RecipeRequest(
pantry_items=["butter"],
level=1,
constraints=[],
max_missing=2,
)
result = engine.suggest(req)
assert all(len(s.missing_ingredients) <= 2 for s in result.suggestions)
assert isinstance(result.grocery_list, list)
def test_hard_day_mode_filters_complex_methods(store_with_recipes):
from app.services.recipe.recipe_engine import RecipeEngine, RecipeRequest, _classify_method_complexity
# Test the classifier directly
assert _classify_method_complexity(["mix all ingredients", "stir to combine"]) == "easy"
assert _classify_method_complexity(["sauté onions", "braise for 2 hours"]) == "involved"
# With hard_day_mode, involved recipes should be filtered out
# Seed a hard recipe into the store
store_with_recipes.conn.execute("""
INSERT INTO recipes (external_id, title, ingredients, ingredient_names,
directions, category, keywords, element_coverage)
VALUES (?,?,?,?,?,?,?,?)
""", ("99", "Braised Short Ribs",
'["butter","beef ribs"]', '["butter","beef ribs"]',
'["braise short ribs for 3 hours","reduce sauce"]',
"Meat", '[]', '{"Richness":0.8}'))
store_with_recipes.conn.commit()
engine = RecipeEngine(store_with_recipes)
req_hard = RecipeRequest(pantry_items=["butter"], level=1, constraints=[], hard_day_mode=True)
result = engine.suggest(req_hard)
titles = [s.title for s in result.suggestions]
assert "Braised Short Ribs" not in titles
def test_grocery_links_free_tier(store_with_recipes):
from app.services.recipe.recipe_engine import RecipeEngine, RecipeRequest
engine = RecipeEngine(store_with_recipes)
req = RecipeRequest(pantry_items=["butter"], level=1, constraints=[], max_missing=5)
result = engine.suggest(req)
# Links may be empty if no retailer env vars set, but structure must be correct
assert isinstance(result.grocery_links, list)
for link in result.grocery_links:
assert hasattr(link, "ingredient")
assert hasattr(link, "retailer")
assert hasattr(link, "url")

View file

@ -0,0 +1,48 @@
def test_seitan_staple_has_yield_formats():
from app.services.recipe.staple_library import StapleLibrary
lib = StapleLibrary()
seitan = lib.get("seitan")
assert seitan is not None
assert "fresh" in seitan.yield_formats
assert "frozen" in seitan.yield_formats
def test_staple_yield_format_has_elements():
from app.services.recipe.staple_library import StapleLibrary
lib = StapleLibrary()
seitan = lib.get("seitan")
fresh = seitan.yield_formats["fresh"]
assert "Structure" in fresh["elements"]
def test_list_all_staples():
from app.services.recipe.staple_library import StapleLibrary
lib = StapleLibrary()
all_staples = lib.list_all()
slugs = [s.slug for s in all_staples]
assert "seitan" in slugs
assert "tempeh" in slugs
def test_tofu_firm_is_loadable():
from app.services.recipe.staple_library import StapleLibrary
lib = StapleLibrary()
tofu = lib.get("tofu_firm")
assert tofu is not None
assert tofu.slug == "tofu_firm"
def test_filter_by_dietary_vegan():
from app.services.recipe.staple_library import StapleLibrary
lib = StapleLibrary()
vegan = lib.filter_by_dietary("vegan")
assert len(vegan) > 0
assert all("vegan" in s.dietary_labels for s in vegan)
def test_list_all_returns_all_three():
from app.services.recipe.staple_library import StapleLibrary
lib = StapleLibrary()
all_staples = lib.list_all()
slugs = {s.slug for s in all_staples}
assert {"seitan", "tempeh", "tofu_firm"} == slugs

View file

@ -0,0 +1,75 @@
from app.services.recipe.style_adapter import StyleAdapter
# --- Spec-required tests ---
def test_italian_style_biases_aromatics():
"""Garlic and onion appear when they're in both pantry and italian aromatics."""
adapter = StyleAdapter()
italian = adapter.get("italian")
pantry = ["garlic", "onion", "ginger"]
result = italian.bias_aroma_selection(pantry)
assert "garlic" in result
assert "onion" in result
def test_east_asian_method_weights_sum_to_one():
"""East Asian method_bias weights sum to ~1.0."""
adapter = StyleAdapter()
east_asian = adapter.get("east_asian")
weights = east_asian.method_weights()
assert abs(sum(weights.values()) - 1.0) < 1e-6
def test_style_adapter_loads_all_five_styles():
"""Adapter discovers all 5 cuisine YAML files."""
adapter = StyleAdapter()
assert len(adapter.styles) == 5
# --- Additional coverage ---
def test_load_italian_style():
adapter = StyleAdapter()
italian = adapter.get("italian")
assert italian is not None
assert "basil" in italian.aromatics or "oregano" in italian.aromatics
def test_bias_aroma_selection_excludes_non_style_items():
"""bias_aroma_selection does not include items not in the style's aromatics."""
adapter = StyleAdapter()
italian = adapter.get("italian")
pantry = ["butter", "parmesan", "basil", "cumin", "soy sauce"]
result = italian.bias_aroma_selection(pantry)
assert "basil" in result
assert "soy sauce" not in result
assert "cumin" not in result
def test_preferred_depth_sources():
"""preferred_depth_sources returns only depth sources present in pantry."""
adapter = StyleAdapter()
italian = adapter.get("italian")
pantry = ["parmesan", "olive oil", "pasta"]
result = italian.preferred_depth_sources(pantry)
assert "parmesan" in result
assert "olive oil" not in result
def test_bias_aroma_selection_adapter_method():
"""StyleAdapter.bias_aroma_selection returns italian-biased items."""
adapter = StyleAdapter()
pantry = ["butter", "parmesan", "basil", "cumin", "soy sauce"]
biased = adapter.bias_aroma_selection("italian", pantry)
assert "basil" in biased
assert "soy sauce" not in biased or "basil" in biased
def test_list_all_styles():
adapter = StyleAdapter()
styles = adapter.list_all()
style_ids = [s.style_id for s in styles]
assert "italian" in style_ids
assert "latin" in style_ids
assert "east_asian" in style_ids

View file

@ -0,0 +1,45 @@
import json, pytest
from tests.services.recipe.test_element_classifier import store_with_profiles
@pytest.fixture
def store_with_subs(store_with_profiles):
store_with_profiles.conn.execute("""
INSERT INTO substitution_pairs
(original_name, substitute_name, constraint_label,
fat_delta, moisture_delta, glutamate_delta, protein_delta, occurrence_count)
VALUES (?,?,?,?,?,?,?,?)
""", ("butter", "coconut oil", "vegan", -1.0, 0.0, 0.0, 0.0, 15))
store_with_profiles.conn.execute("""
INSERT INTO substitution_pairs
(original_name, substitute_name, constraint_label,
fat_delta, moisture_delta, glutamate_delta, protein_delta, occurrence_count)
VALUES (?,?,?,?,?,?,?,?)
""", ("ground beef", "lentils", "vegan", -15.0, 10.0, -2.0, 5.0, 45))
store_with_profiles.conn.commit()
return store_with_profiles
def test_find_substitutes_for_constraint(store_with_subs):
from app.services.recipe.substitution_engine import SubstitutionEngine
engine = SubstitutionEngine(store_with_subs)
results = engine.find_substitutes("butter", constraint="vegan")
assert len(results) > 0
assert results[0].substitute_name == "coconut oil"
def test_compensation_hints_for_large_delta(store_with_subs):
from app.services.recipe.substitution_engine import SubstitutionEngine
engine = SubstitutionEngine(store_with_subs)
results = engine.find_substitutes("ground beef", constraint="vegan")
assert len(results) > 0
swap = results[0]
# Fat delta is -15g — should suggest a Richness compensation
assert any(h["element"] == "Richness" for h in swap.compensation_hints)
def test_no_substitutes_returns_empty(store_with_subs):
from app.services.recipe.substitution_engine import SubstitutionEngine
engine = SubstitutionEngine(store_with_subs)
results = engine.find_substitutes("unobtainium", constraint="vegan")
assert results == []

20
tests/test_tiers.py Normal file
View file

@ -0,0 +1,20 @@
from app.tiers import can_use
def test_leftover_mode_free_tier():
"""Leftover mode is now available to free users (rate-limited at API layer, not hard-gated)."""
assert can_use("leftover_mode", "free") is True
def test_style_picker_requires_paid():
assert can_use("style_picker", "free") is False
assert can_use("style_picker", "paid") is True
def test_staple_library_is_free():
assert can_use("staple_library", "free") is True
def test_recipe_suggestions_byok_unlockable():
assert can_use("recipe_suggestions", "free", has_byok=False) is False
assert can_use("recipe_suggestions", "free", has_byok=True) is True