From e44d36e32f7c273657ca75d60d6f62dca9a7a302 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 30 Mar 2026 23:10:52 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20pipeline=20scripts=20=E2=80=94=20connect?= =?UTF-8?q?ion=20safety,=20remove=20unused=20recipes=5Fpath=20arg,=20fix?= =?UTF-8?q?=20inserted=20counter,=20pre-load=20profile=20index?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/pipeline/build_recipe_index.py | 117 +++++++++++---------- scripts/pipeline/derive_substitutions.py | 123 +++++++++++------------ 2 files changed, 125 insertions(+), 115 deletions(-) diff --git a/scripts/pipeline/build_recipe_index.py b/scripts/pipeline/build_recipe_index.py index 7c6c171..78676fb 100644 --- a/scripts/pipeline/build_recipe_index.py +++ b/scripts/pipeline/build_recipe_index.py @@ -55,53 +55,74 @@ def compute_element_coverage(profiles: list[dict]) -> dict[str, float]: def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: conn = sqlite3.connect(db_path) - conn.execute("PRAGMA journal_mode=WAL") - df = pd.read_parquet(recipes_path) - inserted = 0 - batch = [] + try: + conn.execute("PRAGMA journal_mode=WAL") - for _, row in df.iterrows(): - raw_ingredients = row.get("RecipeIngredientParts", []) - if isinstance(raw_ingredients, str): + # Pre-load ingredient element profiles to avoid N+1 queries + profile_index: dict[str, list[str]] = {} + for row in conn.execute("SELECT name, elements FROM ingredient_profiles"): try: - raw_ingredients = json.loads(raw_ingredients) + profile_index[row[0]] = json.loads(row[1]) except Exception: - raw_ingredients = [raw_ingredients] - raw_ingredients = [str(i) for i in (raw_ingredients or [])] - ingredient_names = extract_ingredient_names(raw_ingredients) + pass - profiles = [] - for name in ingredient_names: - row_p = conn.execute( - "SELECT elements FROM ingredient_profiles WHERE name = ?", (name,) - ).fetchone() - if row_p: - profiles.append({"elements": json.loads(row_p[0])}) - coverage = compute_element_coverage(profiles) + df = pd.read_parquet(recipes_path) + inserted = 0 + batch = [] - directions = row.get("RecipeInstructions", []) - if isinstance(directions, str): - try: - directions = json.loads(directions) - except Exception: - directions = [directions] + for _, row in df.iterrows(): + raw_ingredients = row.get("RecipeIngredientParts", []) + if isinstance(raw_ingredients, str): + try: + raw_ingredients = json.loads(raw_ingredients) + except Exception: + raw_ingredients = [raw_ingredients] + raw_ingredients = [str(i) for i in (raw_ingredients or [])] + ingredient_names = extract_ingredient_names(raw_ingredients) - batch.append(( - str(row.get("RecipeId", "")), - str(row.get("Name", ""))[:500], - json.dumps(raw_ingredients), - json.dumps(ingredient_names), - json.dumps([str(d) for d in (directions or [])]), - str(row.get("RecipeCategory", "") or ""), - json.dumps(list(row.get("Keywords", []) or [])), - float(row.get("Calories") or 0) or None, - float(row.get("FatContent") or 0) or None, - float(row.get("ProteinContent") or 0) or None, - float(row.get("SodiumContent") or 0) or None, - json.dumps(coverage), - )) + profiles = [] + for name in ingredient_names: + if name in profile_index: + profiles.append({"elements": profile_index[name]}) + coverage = compute_element_coverage(profiles) - if len(batch) >= batch_size: + directions = row.get("RecipeInstructions", []) + if isinstance(directions, str): + try: + directions = json.loads(directions) + except Exception: + directions = [directions] + + batch.append(( + str(row.get("RecipeId", "")), + str(row.get("Name", ""))[:500], + json.dumps(raw_ingredients), + json.dumps(ingredient_names), + json.dumps([str(d) for d in (directions or [])]), + str(row.get("RecipeCategory", "") or ""), + json.dumps(list(row.get("Keywords", []) or [])), + float(row.get("Calories") or 0) or None, + float(row.get("FatContent") or 0) or None, + float(row.get("ProteinContent") or 0) or None, + float(row.get("SodiumContent") or 0) or None, + json.dumps(coverage), + )) + + if len(batch) >= batch_size: + before = conn.total_changes + conn.executemany(""" + INSERT OR IGNORE INTO recipes + (external_id, title, ingredients, ingredient_names, directions, + category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?) + """, batch) + conn.commit() + inserted += conn.total_changes - before + print(f" {inserted} recipes inserted...") + batch = [] + + if batch: + before = conn.total_changes conn.executemany(""" INSERT OR IGNORE INTO recipes (external_id, title, ingredients, ingredient_names, directions, @@ -109,21 +130,11 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: VALUES (?,?,?,?,?,?,?,?,?,?,?,?) """, batch) conn.commit() - inserted += len(batch) - print(f" {inserted} recipes inserted...") - batch = [] + inserted += conn.total_changes - before - if batch: - conn.executemany(""" - INSERT OR IGNORE INTO recipes - (external_id, title, ingredients, ingredient_names, directions, - category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage) - VALUES (?,?,?,?,?,?,?,?,?,?,?,?) - """, batch) conn.commit() - inserted += len(batch) - - conn.close() + finally: + conn.close() print(f"Total: {inserted} recipes inserted") diff --git a/scripts/pipeline/derive_substitutions.py b/scripts/pipeline/derive_substitutions.py index 1030ea6..72f0277 100644 --- a/scripts/pipeline/derive_substitutions.py +++ b/scripts/pipeline/derive_substitutions.py @@ -5,8 +5,7 @@ GPL-3.0 source -- derived annotations only, raw pairs not shipped. Usage: conda run -n job-seeker python scripts/pipeline/derive_substitutions.py \ --db /path/to/kiwi.db \ - --recipepairs data/recipepairs.parquet \ - --recipes data/recipes_foodcom.parquet + --recipepairs data/recipepairs.parquet """ from __future__ import annotations import argparse @@ -31,72 +30,73 @@ def diff_ingredients(base: list[str], target: list[str]) -> tuple[list[str], lis return removed, added -def build(db_path: Path, recipepairs_path: Path, recipes_path: Path) -> None: +def build(db_path: Path, recipepairs_path: Path) -> None: conn = sqlite3.connect(db_path) + try: + print("Loading recipe ingredient index...") + recipe_ingredients: dict[str, list[str]] = {} + for row in conn.execute("SELECT external_id, ingredient_names FROM recipes"): + recipe_ingredients[str(row[0])] = json.loads(row[1]) - print("Loading recipe ingredient index...") - recipe_ingredients: dict[str, list[str]] = {} - for row in conn.execute("SELECT external_id, ingredient_names FROM recipes"): - recipe_ingredients[str(row[0])] = json.loads(row[1]) + df = pd.read_parquet(recipepairs_path) + pair_counts: dict[tuple, dict] = defaultdict(lambda: {"count": 0}) - df = pd.read_parquet(recipepairs_path) - pair_counts: dict[tuple, dict] = defaultdict(lambda: {"count": 0}) + print("Diffing recipe pairs...") + for _, row in df.iterrows(): + base_id = str(row.get("base", "")) + target_id = str(row.get("target", "")) + base_ings = recipe_ingredients.get(base_id, []) + target_ings = recipe_ingredients.get(target_id, []) + if not base_ings or not target_ings: + continue - print("Diffing recipe pairs...") - for _, row in df.iterrows(): - base_id = str(row.get("base", "")) - target_id = str(row.get("target", "")) - base_ings = recipe_ingredients.get(base_id, []) - target_ings = recipe_ingredients.get(target_id, []) - if not base_ings or not target_ings: - continue + removed, added = diff_ingredients(base_ings, target_ings) + if len(removed) != 1 or len(added) != 1: + continue - removed, added = diff_ingredients(base_ings, target_ings) - if len(removed) != 1 or len(added) != 1: - continue + original = removed[0] + substitute = added[0] + constraints = [c for c in CONSTRAINT_COLS if row.get(c, 0)] + for constraint in constraints: + key = (original, substitute, constraint) + pair_counts[key]["count"] += 1 - original = removed[0] - substitute = added[0] - constraints = [c for c in CONSTRAINT_COLS if row.get(c, 0)] - for constraint in constraints: - key = (original, substitute, constraint) - pair_counts[key]["count"] += 1 + def get_profile(name: str) -> dict: + row = conn.execute( + "SELECT fat_pct, moisture_pct, glutamate_mg, protein_pct " + "FROM ingredient_profiles WHERE name = ?", (name,) + ).fetchone() + if row: + return {"fat": row[0] or 0, "moisture": row[1] or 0, + "glutamate": row[2] or 0, "protein": row[3] or 0} + return {"fat": 0, "moisture": 0, "glutamate": 0, "protein": 0} - def get_profile(name: str) -> dict: - row = conn.execute( - "SELECT fat_pct, moisture_pct, glutamate_mg, protein_pct " - "FROM ingredient_profiles WHERE name = ?", (name,) - ).fetchone() - if row: - return {"fat": row[0] or 0, "moisture": row[1] or 0, - "glutamate": row[2] or 0, "protein": row[3] or 0} - return {"fat": 0, "moisture": 0, "glutamate": 0, "protein": 0} + print("Writing substitution pairs...") + inserted = 0 + for (original, substitute, constraint), data in pair_counts.items(): + if data["count"] < 3: + continue + p_orig = get_profile(original) + p_sub = get_profile(substitute) + conn.execute(""" + INSERT OR REPLACE INTO substitution_pairs + (original_name, substitute_name, constraint_label, + fat_delta, moisture_delta, glutamate_delta, protein_delta, + occurrence_count, source) + VALUES (?,?,?,?,?,?,?,?,?) + """, ( + original, substitute, constraint, + round(p_sub["fat"] - p_orig["fat"], 2), + round(p_sub["moisture"] - p_orig["moisture"], 2), + round(p_sub["glutamate"] - p_orig["glutamate"], 2), + round(p_sub["protein"] - p_orig["protein"], 2), + data["count"], "derived", + )) + inserted += 1 - print("Writing substitution pairs...") - inserted = 0 - for (original, substitute, constraint), data in pair_counts.items(): - if data["count"] < 3: - continue - p_orig = get_profile(original) - p_sub = get_profile(substitute) - conn.execute(""" - INSERT OR REPLACE INTO substitution_pairs - (original_name, substitute_name, constraint_label, - fat_delta, moisture_delta, glutamate_delta, protein_delta, - occurrence_count, source) - VALUES (?,?,?,?,?,?,?,?,?) - """, ( - original, substitute, constraint, - round(p_sub["fat"] - p_orig["fat"], 2), - round(p_sub["moisture"] - p_orig["moisture"], 2), - round(p_sub["glutamate"] - p_orig["glutamate"], 2), - round(p_sub["protein"] - p_orig["protein"], 2), - data["count"], "derived", - )) - inserted += 1 - - conn.commit() - conn.close() + conn.commit() + finally: + conn.close() print(f"Inserted {inserted} substitution pairs (min 3 occurrences)") @@ -104,6 +104,5 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--db", required=True, type=Path) parser.add_argument("--recipepairs", required=True, type=Path) - parser.add_argument("--recipes", required=True, type=Path) args = parser.parse_args() - build(args.db, args.recipepairs, args.recipes) + build(args.db, args.recipepairs)