From 67cfd70e924a7ff3db09a6349b1943930be184f8 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 30 Mar 2026 22:55:41 -0700 Subject: [PATCH] feat: data pipeline -- recipe corpus + substitution pair derivation --- scripts/pipeline/build_recipe_index.py | 136 ++++++++++++++++++++ scripts/pipeline/derive_substitutions.py | 109 ++++++++++++++++ tests/pipeline/test_build_recipe_index.py | 19 +++ tests/pipeline/test_derive_substitutions.py | 10 ++ 4 files changed, 274 insertions(+) create mode 100644 scripts/pipeline/build_recipe_index.py create mode 100644 scripts/pipeline/derive_substitutions.py create mode 100644 tests/pipeline/test_build_recipe_index.py create mode 100644 tests/pipeline/test_derive_substitutions.py diff --git a/scripts/pipeline/build_recipe_index.py b/scripts/pipeline/build_recipe_index.py new file mode 100644 index 0000000..7c6c171 --- /dev/null +++ b/scripts/pipeline/build_recipe_index.py @@ -0,0 +1,136 @@ +""" +Import food.com recipe corpus into recipes table. + +Usage: + conda run -n job-seeker python scripts/pipeline/build_recipe_index.py \ + --db /path/to/kiwi.db \ + --recipes data/recipes_foodcom.parquet \ + --batch-size 10000 +""" +from __future__ import annotations +import argparse +import json +import re +import sqlite3 +from pathlib import Path + +import pandas as pd + +_MEASURE_PATTERN = re.compile( + r"^\d[\d\s/\u00bc\u00bd\u00be\u2153\u2154]*\s*(cup|tbsp|tsp|oz|lb|g|kg|ml|l|clove|slice|piece|can|pkg|package|bunch|head|stalk|sprig|pinch|dash|to taste|as needed)s?\b", + re.IGNORECASE, +) +_LEAD_NUMBER = re.compile(r"^\d[\d\s/\u00bc\u00bd\u00be\u2153\u2154]*\s*") +_TRAILING_QUALIFIER = re.compile( + r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$", + re.IGNORECASE, +) + + +def extract_ingredient_names(raw_list: list[str]) -> list[str]: + """Strip quantities and units from ingredient strings -> normalized names.""" + names = [] + for raw in raw_list: + s = raw.lower().strip() + s = _MEASURE_PATTERN.sub("", s) + s = _LEAD_NUMBER.sub("", s) + s = re.sub(r"\(.*?\)", "", s) + s = re.sub(r",.*$", "", s) + s = _TRAILING_QUALIFIER.sub("", s) + s = s.strip(" -.,") + if s and len(s) > 1: + names.append(s) + return names + + +def compute_element_coverage(profiles: list[dict]) -> dict[str, float]: + counts: dict[str, int] = {} + for p in profiles: + for elem in p.get("elements", []): + counts[elem] = counts.get(elem, 0) + 1 + if not profiles: + return {} + return {e: round(c / len(profiles), 3) for e, c in counts.items()} + + +def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: + conn = sqlite3.connect(db_path) + conn.execute("PRAGMA journal_mode=WAL") + df = pd.read_parquet(recipes_path) + inserted = 0 + batch = [] + + for _, row in df.iterrows(): + raw_ingredients = row.get("RecipeIngredientParts", []) + if isinstance(raw_ingredients, str): + try: + raw_ingredients = json.loads(raw_ingredients) + except Exception: + raw_ingredients = [raw_ingredients] + raw_ingredients = [str(i) for i in (raw_ingredients or [])] + ingredient_names = extract_ingredient_names(raw_ingredients) + + profiles = [] + for name in ingredient_names: + row_p = conn.execute( + "SELECT elements FROM ingredient_profiles WHERE name = ?", (name,) + ).fetchone() + if row_p: + profiles.append({"elements": json.loads(row_p[0])}) + coverage = compute_element_coverage(profiles) + + directions = row.get("RecipeInstructions", []) + if isinstance(directions, str): + try: + directions = json.loads(directions) + except Exception: + directions = [directions] + + batch.append(( + str(row.get("RecipeId", "")), + str(row.get("Name", ""))[:500], + json.dumps(raw_ingredients), + json.dumps(ingredient_names), + json.dumps([str(d) for d in (directions or [])]), + str(row.get("RecipeCategory", "") or ""), + json.dumps(list(row.get("Keywords", []) or [])), + float(row.get("Calories") or 0) or None, + float(row.get("FatContent") or 0) or None, + float(row.get("ProteinContent") or 0) or None, + float(row.get("SodiumContent") or 0) or None, + json.dumps(coverage), + )) + + if len(batch) >= batch_size: + conn.executemany(""" + INSERT OR IGNORE INTO recipes + (external_id, title, ingredients, ingredient_names, directions, + category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?) + """, batch) + conn.commit() + inserted += len(batch) + print(f" {inserted} recipes inserted...") + batch = [] + + if batch: + conn.executemany(""" + INSERT OR IGNORE INTO recipes + (external_id, title, ingredients, ingredient_names, directions, + category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?) + """, batch) + conn.commit() + inserted += len(batch) + + conn.close() + print(f"Total: {inserted} recipes inserted") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--db", required=True, type=Path) + parser.add_argument("--recipes", required=True, type=Path) + parser.add_argument("--batch-size", type=int, default=10000) + args = parser.parse_args() + build(args.db, args.recipes, args.batch_size) diff --git a/scripts/pipeline/derive_substitutions.py b/scripts/pipeline/derive_substitutions.py new file mode 100644 index 0000000..1030ea6 --- /dev/null +++ b/scripts/pipeline/derive_substitutions.py @@ -0,0 +1,109 @@ +""" +Derive substitution pairs by diffing lishuyang/recipepairs. +GPL-3.0 source -- derived annotations only, raw pairs not shipped. + +Usage: + conda run -n job-seeker python scripts/pipeline/derive_substitutions.py \ + --db /path/to/kiwi.db \ + --recipepairs data/recipepairs.parquet \ + --recipes data/recipes_foodcom.parquet +""" +from __future__ import annotations +import argparse +import json +import sqlite3 +from collections import defaultdict +from pathlib import Path + +import pandas as pd + +from scripts.pipeline.build_recipe_index import extract_ingredient_names + +CONSTRAINT_COLS = ["vegan", "vegetarian", "dairy_free", "low_calorie", + "low_carb", "low_fat", "low_sodium", "gluten_free"] + + +def diff_ingredients(base: list[str], target: list[str]) -> tuple[list[str], list[str]]: + base_set = set(base) + target_set = set(target) + removed = list(base_set - target_set) + added = list(target_set - base_set) + return removed, added + + +def build(db_path: Path, recipepairs_path: Path, recipes_path: Path) -> None: + conn = sqlite3.connect(db_path) + + print("Loading recipe ingredient index...") + recipe_ingredients: dict[str, list[str]] = {} + for row in conn.execute("SELECT external_id, ingredient_names FROM recipes"): + recipe_ingredients[str(row[0])] = json.loads(row[1]) + + df = pd.read_parquet(recipepairs_path) + pair_counts: dict[tuple, dict] = defaultdict(lambda: {"count": 0}) + + print("Diffing recipe pairs...") + for _, row in df.iterrows(): + base_id = str(row.get("base", "")) + target_id = str(row.get("target", "")) + base_ings = recipe_ingredients.get(base_id, []) + target_ings = recipe_ingredients.get(target_id, []) + if not base_ings or not target_ings: + continue + + removed, added = diff_ingredients(base_ings, target_ings) + if len(removed) != 1 or len(added) != 1: + continue + + original = removed[0] + substitute = added[0] + constraints = [c for c in CONSTRAINT_COLS if row.get(c, 0)] + for constraint in constraints: + key = (original, substitute, constraint) + pair_counts[key]["count"] += 1 + + def get_profile(name: str) -> dict: + row = conn.execute( + "SELECT fat_pct, moisture_pct, glutamate_mg, protein_pct " + "FROM ingredient_profiles WHERE name = ?", (name,) + ).fetchone() + if row: + return {"fat": row[0] or 0, "moisture": row[1] or 0, + "glutamate": row[2] or 0, "protein": row[3] or 0} + return {"fat": 0, "moisture": 0, "glutamate": 0, "protein": 0} + + print("Writing substitution pairs...") + inserted = 0 + for (original, substitute, constraint), data in pair_counts.items(): + if data["count"] < 3: + continue + p_orig = get_profile(original) + p_sub = get_profile(substitute) + conn.execute(""" + INSERT OR REPLACE INTO substitution_pairs + (original_name, substitute_name, constraint_label, + fat_delta, moisture_delta, glutamate_delta, protein_delta, + occurrence_count, source) + VALUES (?,?,?,?,?,?,?,?,?) + """, ( + original, substitute, constraint, + round(p_sub["fat"] - p_orig["fat"], 2), + round(p_sub["moisture"] - p_orig["moisture"], 2), + round(p_sub["glutamate"] - p_orig["glutamate"], 2), + round(p_sub["protein"] - p_orig["protein"], 2), + data["count"], "derived", + )) + inserted += 1 + + conn.commit() + conn.close() + print(f"Inserted {inserted} substitution pairs (min 3 occurrences)") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--db", required=True, type=Path) + parser.add_argument("--recipepairs", required=True, type=Path) + parser.add_argument("--recipes", required=True, type=Path) + args = parser.parse_args() + build(args.db, args.recipepairs, args.recipes) diff --git a/tests/pipeline/test_build_recipe_index.py b/tests/pipeline/test_build_recipe_index.py new file mode 100644 index 0000000..3725a6e --- /dev/null +++ b/tests/pipeline/test_build_recipe_index.py @@ -0,0 +1,19 @@ +def test_extract_ingredient_names(): + from scripts.pipeline.build_recipe_index import extract_ingredient_names + raw = ["2 cups all-purpose flour", "1 lb ground beef (85/15)", "salt to taste"] + names = extract_ingredient_names(raw) + assert "flour" in names or "all-purpose flour" in names + assert "ground beef" in names + assert "salt" in names + +def test_compute_element_coverage(): + from scripts.pipeline.build_recipe_index import compute_element_coverage + profiles = [ + {"elements": ["Richness", "Depth"]}, + {"elements": ["Brightness"]}, + {"elements": ["Seasoning"]}, + ] + coverage = compute_element_coverage(profiles) + assert coverage["Richness"] > 0 + assert coverage["Brightness"] > 0 + assert coverage.get("Aroma", 0) == 0 diff --git a/tests/pipeline/test_derive_substitutions.py b/tests/pipeline/test_derive_substitutions.py new file mode 100644 index 0000000..d0c5fa3 --- /dev/null +++ b/tests/pipeline/test_derive_substitutions.py @@ -0,0 +1,10 @@ +def test_diff_ingredient_lists(): + from scripts.pipeline.derive_substitutions import diff_ingredients + base = ["ground beef", "chicken broth", "olive oil", "onion"] + target = ["lentils", "vegetable broth", "olive oil", "onion"] + removed, added = diff_ingredients(base, target) + assert "ground beef" in removed + assert "chicken broth" in removed + assert "lentils" in added + assert "vegetable broth" in added + assert "olive oil" not in removed # unchanged