From 77627cec23b155502386660e0d848d5f672423b7 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Tue, 31 Mar 2026 21:36:13 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20data=20pipeline=20=E2=80=94=20R-vector?= =?UTF-8?q?=20parser,=20allrecipes=20dataset,=20unique=20recipe=20index?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - build_recipe_index.py: add _parse_r_vector() for food.com R format, add _parse_allrecipes_text() for corbt/all-recipes text format, _row_to_fields() dispatcher handles both columnar (food.com) and single-text (all-recipes) - build_flavorgraph_index.py: switch from graph.json to nodes/edges CSVs matching actual FlavorGraph repo structure - download_datasets.py: switch recipe source to corbt/all-recipes (2.1M recipes, 807MB) replacing near-empty AkashPS11/recipes_data_food.com - 007_recipe_corpus.sql: add UNIQUE constraint on external_id to prevent duplicate inserts on pipeline reruns --- app/db/migrations/007_recipe_corpus.sql | 2 +- scripts/pipeline/build_flavorgraph_index.py | 84 ++++++++++++--------- scripts/pipeline/build_recipe_index.py | 82 +++++++++++++++----- scripts/pipeline/download_datasets.py | 18 ++--- 4 files changed, 120 insertions(+), 66 deletions(-) diff --git a/app/db/migrations/007_recipe_corpus.sql b/app/db/migrations/007_recipe_corpus.sql index 19a79f4..bc8fdaf 100644 --- a/app/db/migrations/007_recipe_corpus.sql +++ b/app/db/migrations/007_recipe_corpus.sql @@ -21,4 +21,4 @@ CREATE TABLE recipes ( CREATE INDEX idx_recipes_title ON recipes (title); CREATE INDEX idx_recipes_category ON recipes (category); -CREATE INDEX idx_recipes_external_id ON recipes (external_id); +CREATE UNIQUE INDEX idx_recipes_external_id ON recipes (external_id); diff --git a/scripts/pipeline/build_flavorgraph_index.py b/scripts/pipeline/build_flavorgraph_index.py index d831f85..e435977 100644 --- a/scripts/pipeline/build_flavorgraph_index.py +++ b/scripts/pipeline/build_flavorgraph_index.py @@ -5,9 +5,9 @@ FlavorGraph GitHub: https://github.com/lamypark/FlavorGraph Download: git clone https://github.com/lamypark/FlavorGraph /tmp/flavorgraph Usage: - conda run -n job-seeker python scripts/pipeline/build_flavorgraph_index.py \ - --db /path/to/kiwi.db \ - --graph-json /tmp/flavorgraph/data/graph.json + conda run -n cf python scripts/pipeline/build_flavorgraph_index.py \ + --db data/kiwi.db \ + --flavorgraph-dir /tmp/flavorgraph/input """ from __future__ import annotations import argparse @@ -16,64 +16,74 @@ import sqlite3 from collections import defaultdict from pathlib import Path +import pandas as pd + + +def parse_ingredient_nodes( + nodes_path: Path, edges_path: Path +) -> tuple[dict[str, list[str]], dict[str, str]]: + """Parse FlavorGraph CSVs → (ingredient→compounds, compound→name).""" + nodes = pd.read_csv(nodes_path, dtype=str).fillna("") + edges = pd.read_csv(edges_path, dtype=str).fillna("") -def parse_ingredient_nodes(graph: dict) -> dict[str, list[str]]: - """Return {ingredient_name: [compound_id, ...]} from a FlavorGraph JSON.""" - ingredient_compounds: dict[str, list[str]] = defaultdict(list) ingredient_ids: dict[str, str] = {} # node_id -> ingredient_name + compound_names: dict[str, str] = {} # node_id -> compound_name - for node in graph.get("nodes", []): - if node.get("type") == "ingredient": - ingredient_ids[node["id"]] = node["name"].lower() + for _, row in nodes.iterrows(): + nid = row["node_id"] + name = row["name"].lower().replace("_", " ").strip() + if row["node_type"] == "ingredient": + ingredient_ids[nid] = name + else: + compound_names[nid] = name - for link in graph.get("links", []): - src, tgt = link.get("source", ""), link.get("target", "") + ingredient_compounds: dict[str, list[str]] = defaultdict(list) + for _, row in edges.iterrows(): + src, tgt = row["id_1"], row["id_2"] if src in ingredient_ids: ingredient_compounds[ingredient_ids[src]].append(tgt) if tgt in ingredient_ids: ingredient_compounds[ingredient_ids[tgt]].append(src) - return dict(ingredient_compounds) + return dict(ingredient_compounds), compound_names -def build(db_path: Path, graph_json_path: Path) -> None: - graph = json.loads(graph_json_path.read_text()) - ingredient_map = parse_ingredient_nodes(graph) +def build(db_path: Path, flavorgraph_dir: Path) -> None: + nodes_path = flavorgraph_dir / "nodes_191120.csv" + edges_path = flavorgraph_dir / "edges_191120.csv" + + ingredient_map, compound_names = parse_ingredient_nodes(nodes_path, edges_path) compound_ingredients: dict[str, list[str]] = defaultdict(list) - compound_names: dict[str, str] = {} - - for node in graph.get("nodes", []): - if node.get("type") == "compound": - compound_names[node["id"]] = node["name"] - for ingredient, compounds in ingredient_map.items(): for cid in compounds: compound_ingredients[cid].append(ingredient) conn = sqlite3.connect(db_path) + try: + for ingredient, compounds in ingredient_map.items(): + conn.execute( + "UPDATE ingredient_profiles SET flavor_molecule_ids = ? WHERE name = ?", + (json.dumps(compounds), ingredient), + ) - for ingredient, compounds in ingredient_map.items(): - conn.execute(""" - UPDATE ingredient_profiles - SET flavor_molecule_ids = ? - WHERE name = ? - """, (json.dumps(compounds), ingredient)) + for cid, ingredients in compound_ingredients.items(): + conn.execute( + "INSERT OR IGNORE INTO flavor_molecules (compound_id, compound_name, ingredient_names)" + " VALUES (?, ?, ?)", + (cid, compound_names.get(cid, cid), json.dumps(ingredients)), + ) - for cid, ingredients in compound_ingredients.items(): - conn.execute(""" - INSERT OR IGNORE INTO flavor_molecules (compound_id, compound_name, ingredient_names) - VALUES (?, ?, ?) - """, (cid, compound_names.get(cid, cid), json.dumps(ingredients))) + conn.commit() + finally: + conn.close() - conn.commit() - conn.close() print(f"Indexed {len(ingredient_map)} ingredients, {len(compound_ingredients)} compounds") if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--db", required=True, type=Path) - parser.add_argument("--graph-json", required=True, type=Path) + parser.add_argument("--db", required=True, type=Path) + parser.add_argument("--flavorgraph-dir", required=True, type=Path) args = parser.parse_args() - build(args.db, args.graph_json) + build(args.db, args.flavorgraph_dir) diff --git a/scripts/pipeline/build_recipe_index.py b/scripts/pipeline/build_recipe_index.py index 78676fb..f603950 100644 --- a/scripts/pipeline/build_recipe_index.py +++ b/scripts/pipeline/build_recipe_index.py @@ -25,6 +25,12 @@ _TRAILING_QUALIFIER = re.compile( r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$", re.IGNORECASE, ) +_QUOTED = re.compile(r'"([^"]*)"') + + +def _parse_r_vector(s: str) -> list[str]: + """Parse R character vector format: c("a", "b") -> ["a", "b"].""" + return _QUOTED.findall(s) def extract_ingredient_names(raw_list: list[str]) -> list[str]: @@ -53,6 +59,55 @@ def compute_element_coverage(profiles: list[dict]) -> dict[str, float]: return {e: round(c / len(profiles), 3) for e, c in counts.items()} +def _parse_allrecipes_text(text: str) -> tuple[str, list[str], list[str]]: + """Parse corbt/all-recipes text format into (title, ingredients, directions).""" + lines = text.strip().split('\n') + title = lines[0].strip() + ingredients: list[str] = [] + directions: list[str] = [] + section: str | None = None + for line in lines[1:]: + stripped = line.strip() + if stripped.lower() == 'ingredients:': + section = 'ingredients' + elif stripped.lower() in ('directions:', 'steps:', 'instructions:'): + section = 'directions' + elif stripped.startswith('- ') and section == 'ingredients': + ingredients.append(stripped[2:].strip()) + elif stripped.startswith('- ') and section == 'directions': + directions.append(stripped[2:].strip()) + return title, ingredients, directions + + +def _row_to_fields(row: pd.Series) -> tuple[str, str, list[str], list[str]]: + """Extract (external_id, title, raw_ingredients, directions) from a parquet row. + + Handles both corbt/all-recipes (single 'input' text column) and the + food.com columnar format (RecipeId, Name, RecipeIngredientParts, ...). + """ + if "input" in row.index and pd.notna(row.get("input")): + title, raw_ingredients, directions = _parse_allrecipes_text(str(row["input"])) + external_id = f"ar_{hash(title) & 0xFFFFFFFF}" + else: + raw_parts = row.get("RecipeIngredientParts", []) + if isinstance(raw_parts, str): + parsed = _parse_r_vector(raw_parts) + raw_parts = parsed if parsed else [raw_parts] + raw_ingredients = [str(i) for i in (raw_parts or [])] + + raw_dirs = row.get("RecipeInstructions", []) + if isinstance(raw_dirs, str): + parsed_dirs = _parse_r_vector(raw_dirs) + directions = parsed_dirs if parsed_dirs else [raw_dirs] + else: + directions = [str(d) for d in (raw_dirs or [])] + + title = str(row.get("Name", ""))[:500] + external_id = str(row.get("RecipeId", "")) + + return external_id, title, raw_ingredients, directions + + def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: conn = sqlite3.connect(db_path) try: @@ -71,13 +126,9 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: batch = [] for _, row in df.iterrows(): - raw_ingredients = row.get("RecipeIngredientParts", []) - if isinstance(raw_ingredients, str): - try: - raw_ingredients = json.loads(raw_ingredients) - except Exception: - raw_ingredients = [raw_ingredients] - raw_ingredients = [str(i) for i in (raw_ingredients or [])] + external_id, title, raw_ingredients, directions = _row_to_fields(row) + if not title: + continue ingredient_names = extract_ingredient_names(raw_ingredients) profiles = [] @@ -86,19 +137,12 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: profiles.append({"elements": profile_index[name]}) coverage = compute_element_coverage(profiles) - directions = row.get("RecipeInstructions", []) - if isinstance(directions, str): - try: - directions = json.loads(directions) - except Exception: - directions = [directions] - batch.append(( - str(row.get("RecipeId", "")), - str(row.get("Name", ""))[:500], + external_id, + title, json.dumps(raw_ingredients), json.dumps(ingredient_names), - json.dumps([str(d) for d in (directions or [])]), + json.dumps(directions), str(row.get("RecipeCategory", "") or ""), json.dumps(list(row.get("Keywords", []) or [])), float(row.get("Calories") or 0) or None, @@ -111,7 +155,7 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: if len(batch) >= batch_size: before = conn.total_changes conn.executemany(""" - INSERT OR IGNORE INTO recipes + INSERT OR REPLACE INTO recipes (external_id, title, ingredients, ingredient_names, directions, category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) @@ -124,7 +168,7 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: if batch: before = conn.total_changes conn.executemany(""" - INSERT OR IGNORE INTO recipes + INSERT OR REPLACE INTO recipes (external_id, title, ingredients, ingredient_names, directions, category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) diff --git a/scripts/pipeline/download_datasets.py b/scripts/pipeline/download_datasets.py index 3166210..ab2d733 100644 --- a/scripts/pipeline/download_datasets.py +++ b/scripts/pipeline/download_datasets.py @@ -2,13 +2,13 @@ Download recipe engine datasets from HuggingFace. Usage: - conda run -n job-seeker python scripts/pipeline/download_datasets.py --data-dir /path/to/data + conda run -n cf python scripts/pipeline/download_datasets.py --data-dir data/pipeline Downloads: - - AkashPS11/recipes_data_food.com (MIT) → data/recipes_foodcom.parquet - - omid5/usda-fdc-foods-cleaned (CC0) → data/usda_fdc_cleaned.parquet - - jacktol/usda-branded-food-data (MIT) → data/usda_branded.parquet - - lishuyang/recipepairs (GPL-3.0 ⚠) → data/recipepairs.parquet [derive only, don't ship] + - corbt/all-recipes (no license) → data/pipeline/recipes_allrecipes.parquet [2.1M recipes] + - omid5/usda-fdc-foods-cleaned (CC0) → data/pipeline/usda_fdc_cleaned.parquet + - jacktol/usda-branded-food-data (MIT) → data/pipeline/usda_branded.parquet + - lishuyang/recipepairs (GPL-3.0 ⚠) → data/pipeline/recipepairs.parquet [derive only, don't ship] """ from __future__ import annotations import argparse @@ -17,10 +17,10 @@ from datasets import load_dataset DATASETS = [ - ("AkashPS11/recipes_data_food.com", "train", "recipes_foodcom.parquet"), - ("omid5/usda-fdc-foods-cleaned", "train", "usda_fdc_cleaned.parquet"), - ("jacktol/usda-branded-food-data", "train", "usda_branded.parquet"), - ("lishuyang/recipepairs", "train", "recipepairs.parquet"), + ("corbt/all-recipes", "train", "recipes_allrecipes.parquet"), + ("omid5/usda-fdc-foods-cleaned", "train", "usda_fdc_cleaned.parquet"), + ("jacktol/usda-branded-food-data","train", "usda_branded.parquet"), + ("lishuyang/recipepairs", "train", "recipepairs.parquet"), ]