fix: data pipeline — R-vector parser, allrecipes dataset, unique recipe index

- build_recipe_index.py: add _parse_r_vector() for food.com R format, add
  _parse_allrecipes_text() for corbt/all-recipes text format, _row_to_fields()
  dispatcher handles both columnar (food.com) and single-text (all-recipes)
- build_flavorgraph_index.py: switch from graph.json to nodes/edges CSVs
  matching actual FlavorGraph repo structure
- download_datasets.py: switch recipe source to corbt/all-recipes (2.1M
  recipes, 807MB) replacing near-empty AkashPS11/recipes_data_food.com
- 007_recipe_corpus.sql: add UNIQUE constraint on external_id to prevent
  duplicate inserts on pipeline reruns
This commit is contained in:
pyr0ball 2026-03-31 21:36:13 -07:00
parent 9b890f5fde
commit 77627cec23
4 changed files with 120 additions and 66 deletions

View file

@ -21,4 +21,4 @@ CREATE TABLE recipes (
CREATE INDEX idx_recipes_title ON recipes (title);
CREATE INDEX idx_recipes_category ON recipes (category);
CREATE INDEX idx_recipes_external_id ON recipes (external_id);
CREATE UNIQUE INDEX idx_recipes_external_id ON recipes (external_id);

View file

@ -5,9 +5,9 @@ FlavorGraph GitHub: https://github.com/lamypark/FlavorGraph
Download: git clone https://github.com/lamypark/FlavorGraph /tmp/flavorgraph
Usage:
conda run -n job-seeker python scripts/pipeline/build_flavorgraph_index.py \
--db /path/to/kiwi.db \
--graph-json /tmp/flavorgraph/data/graph.json
conda run -n cf python scripts/pipeline/build_flavorgraph_index.py \
--db data/kiwi.db \
--flavorgraph-dir /tmp/flavorgraph/input
"""
from __future__ import annotations
import argparse
@ -16,64 +16,74 @@ import sqlite3
from collections import defaultdict
from pathlib import Path
import pandas as pd
def parse_ingredient_nodes(
nodes_path: Path, edges_path: Path
) -> tuple[dict[str, list[str]], dict[str, str]]:
"""Parse FlavorGraph CSVs → (ingredient→compounds, compound→name)."""
nodes = pd.read_csv(nodes_path, dtype=str).fillna("")
edges = pd.read_csv(edges_path, dtype=str).fillna("")
def parse_ingredient_nodes(graph: dict) -> dict[str, list[str]]:
"""Return {ingredient_name: [compound_id, ...]} from a FlavorGraph JSON."""
ingredient_compounds: dict[str, list[str]] = defaultdict(list)
ingredient_ids: dict[str, str] = {} # node_id -> ingredient_name
compound_names: dict[str, str] = {} # node_id -> compound_name
for node in graph.get("nodes", []):
if node.get("type") == "ingredient":
ingredient_ids[node["id"]] = node["name"].lower()
for _, row in nodes.iterrows():
nid = row["node_id"]
name = row["name"].lower().replace("_", " ").strip()
if row["node_type"] == "ingredient":
ingredient_ids[nid] = name
else:
compound_names[nid] = name
for link in graph.get("links", []):
src, tgt = link.get("source", ""), link.get("target", "")
ingredient_compounds: dict[str, list[str]] = defaultdict(list)
for _, row in edges.iterrows():
src, tgt = row["id_1"], row["id_2"]
if src in ingredient_ids:
ingredient_compounds[ingredient_ids[src]].append(tgt)
if tgt in ingredient_ids:
ingredient_compounds[ingredient_ids[tgt]].append(src)
return dict(ingredient_compounds)
return dict(ingredient_compounds), compound_names
def build(db_path: Path, graph_json_path: Path) -> None:
graph = json.loads(graph_json_path.read_text())
ingredient_map = parse_ingredient_nodes(graph)
def build(db_path: Path, flavorgraph_dir: Path) -> None:
nodes_path = flavorgraph_dir / "nodes_191120.csv"
edges_path = flavorgraph_dir / "edges_191120.csv"
ingredient_map, compound_names = parse_ingredient_nodes(nodes_path, edges_path)
compound_ingredients: dict[str, list[str]] = defaultdict(list)
compound_names: dict[str, str] = {}
for node in graph.get("nodes", []):
if node.get("type") == "compound":
compound_names[node["id"]] = node["name"]
for ingredient, compounds in ingredient_map.items():
for cid in compounds:
compound_ingredients[cid].append(ingredient)
conn = sqlite3.connect(db_path)
try:
for ingredient, compounds in ingredient_map.items():
conn.execute("""
UPDATE ingredient_profiles
SET flavor_molecule_ids = ?
WHERE name = ?
""", (json.dumps(compounds), ingredient))
conn.execute(
"UPDATE ingredient_profiles SET flavor_molecule_ids = ? WHERE name = ?",
(json.dumps(compounds), ingredient),
)
for cid, ingredients in compound_ingredients.items():
conn.execute("""
INSERT OR IGNORE INTO flavor_molecules (compound_id, compound_name, ingredient_names)
VALUES (?, ?, ?)
""", (cid, compound_names.get(cid, cid), json.dumps(ingredients)))
conn.execute(
"INSERT OR IGNORE INTO flavor_molecules (compound_id, compound_name, ingredient_names)"
" VALUES (?, ?, ?)",
(cid, compound_names.get(cid, cid), json.dumps(ingredients)),
)
conn.commit()
finally:
conn.close()
print(f"Indexed {len(ingredient_map)} ingredients, {len(compound_ingredients)} compounds")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--db", required=True, type=Path)
parser.add_argument("--graph-json", required=True, type=Path)
parser.add_argument("--flavorgraph-dir", required=True, type=Path)
args = parser.parse_args()
build(args.db, args.graph_json)
build(args.db, args.flavorgraph_dir)

View file

@ -25,6 +25,12 @@ _TRAILING_QUALIFIER = re.compile(
r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$",
re.IGNORECASE,
)
_QUOTED = re.compile(r'"([^"]*)"')
def _parse_r_vector(s: str) -> list[str]:
"""Parse R character vector format: c("a", "b") -> ["a", "b"]."""
return _QUOTED.findall(s)
def extract_ingredient_names(raw_list: list[str]) -> list[str]:
@ -53,6 +59,55 @@ def compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
return {e: round(c / len(profiles), 3) for e, c in counts.items()}
def _parse_allrecipes_text(text: str) -> tuple[str, list[str], list[str]]:
"""Parse corbt/all-recipes text format into (title, ingredients, directions)."""
lines = text.strip().split('\n')
title = lines[0].strip()
ingredients: list[str] = []
directions: list[str] = []
section: str | None = None
for line in lines[1:]:
stripped = line.strip()
if stripped.lower() == 'ingredients:':
section = 'ingredients'
elif stripped.lower() in ('directions:', 'steps:', 'instructions:'):
section = 'directions'
elif stripped.startswith('- ') and section == 'ingredients':
ingredients.append(stripped[2:].strip())
elif stripped.startswith('- ') and section == 'directions':
directions.append(stripped[2:].strip())
return title, ingredients, directions
def _row_to_fields(row: pd.Series) -> tuple[str, str, list[str], list[str]]:
"""Extract (external_id, title, raw_ingredients, directions) from a parquet row.
Handles both corbt/all-recipes (single 'input' text column) and the
food.com columnar format (RecipeId, Name, RecipeIngredientParts, ...).
"""
if "input" in row.index and pd.notna(row.get("input")):
title, raw_ingredients, directions = _parse_allrecipes_text(str(row["input"]))
external_id = f"ar_{hash(title) & 0xFFFFFFFF}"
else:
raw_parts = row.get("RecipeIngredientParts", [])
if isinstance(raw_parts, str):
parsed = _parse_r_vector(raw_parts)
raw_parts = parsed if parsed else [raw_parts]
raw_ingredients = [str(i) for i in (raw_parts or [])]
raw_dirs = row.get("RecipeInstructions", [])
if isinstance(raw_dirs, str):
parsed_dirs = _parse_r_vector(raw_dirs)
directions = parsed_dirs if parsed_dirs else [raw_dirs]
else:
directions = [str(d) for d in (raw_dirs or [])]
title = str(row.get("Name", ""))[:500]
external_id = str(row.get("RecipeId", ""))
return external_id, title, raw_ingredients, directions
def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
conn = sqlite3.connect(db_path)
try:
@ -71,13 +126,9 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
batch = []
for _, row in df.iterrows():
raw_ingredients = row.get("RecipeIngredientParts", [])
if isinstance(raw_ingredients, str):
try:
raw_ingredients = json.loads(raw_ingredients)
except Exception:
raw_ingredients = [raw_ingredients]
raw_ingredients = [str(i) for i in (raw_ingredients or [])]
external_id, title, raw_ingredients, directions = _row_to_fields(row)
if not title:
continue
ingredient_names = extract_ingredient_names(raw_ingredients)
profiles = []
@ -86,19 +137,12 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
profiles.append({"elements": profile_index[name]})
coverage = compute_element_coverage(profiles)
directions = row.get("RecipeInstructions", [])
if isinstance(directions, str):
try:
directions = json.loads(directions)
except Exception:
directions = [directions]
batch.append((
str(row.get("RecipeId", "")),
str(row.get("Name", ""))[:500],
external_id,
title,
json.dumps(raw_ingredients),
json.dumps(ingredient_names),
json.dumps([str(d) for d in (directions or [])]),
json.dumps(directions),
str(row.get("RecipeCategory", "") or ""),
json.dumps(list(row.get("Keywords", []) or [])),
float(row.get("Calories") or 0) or None,
@ -111,7 +155,7 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
if len(batch) >= batch_size:
before = conn.total_changes
conn.executemany("""
INSERT OR IGNORE INTO recipes
INSERT OR REPLACE INTO recipes
(external_id, title, ingredients, ingredient_names, directions,
category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
@ -124,7 +168,7 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
if batch:
before = conn.total_changes
conn.executemany("""
INSERT OR IGNORE INTO recipes
INSERT OR REPLACE INTO recipes
(external_id, title, ingredients, ingredient_names, directions,
category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)

View file

@ -2,13 +2,13 @@
Download recipe engine datasets from HuggingFace.
Usage:
conda run -n job-seeker python scripts/pipeline/download_datasets.py --data-dir /path/to/data
conda run -n cf python scripts/pipeline/download_datasets.py --data-dir data/pipeline
Downloads:
- AkashPS11/recipes_data_food.com (MIT) data/recipes_foodcom.parquet
- omid5/usda-fdc-foods-cleaned (CC0) data/usda_fdc_cleaned.parquet
- jacktol/usda-branded-food-data (MIT) data/usda_branded.parquet
- lishuyang/recipepairs (GPL-3.0 ) data/recipepairs.parquet [derive only, don't ship]
- corbt/all-recipes (no license) data/pipeline/recipes_allrecipes.parquet [2.1M recipes]
- omid5/usda-fdc-foods-cleaned (CC0) data/pipeline/usda_fdc_cleaned.parquet
- jacktol/usda-branded-food-data (MIT) data/pipeline/usda_branded.parquet
- lishuyang/recipepairs (GPL-3.0 ) data/pipeline/recipepairs.parquet [derive only, don't ship]
"""
from __future__ import annotations
import argparse
@ -17,7 +17,7 @@ from datasets import load_dataset
DATASETS = [
("AkashPS11/recipes_data_food.com", "train", "recipes_foodcom.parquet"),
("corbt/all-recipes", "train", "recipes_allrecipes.parquet"),
("omid5/usda-fdc-foods-cleaned", "train", "usda_fdc_cleaned.parquet"),
("jacktol/usda-branded-food-data","train", "usda_branded.parquet"),
("lishuyang/recipepairs", "train", "recipepairs.parquet"),