fix: pipeline scripts — connection safety, remove unused recipes_path arg, fix inserted counter, pre-load profile index
This commit is contained in:
parent
e57ae74e27
commit
e44d36e32f
2 changed files with 125 additions and 115 deletions
|
|
@ -55,53 +55,74 @@ def compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
|
|||
|
||||
def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
df = pd.read_parquet(recipes_path)
|
||||
inserted = 0
|
||||
batch = []
|
||||
try:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
|
||||
for _, row in df.iterrows():
|
||||
raw_ingredients = row.get("RecipeIngredientParts", [])
|
||||
if isinstance(raw_ingredients, str):
|
||||
# Pre-load ingredient element profiles to avoid N+1 queries
|
||||
profile_index: dict[str, list[str]] = {}
|
||||
for row in conn.execute("SELECT name, elements FROM ingredient_profiles"):
|
||||
try:
|
||||
raw_ingredients = json.loads(raw_ingredients)
|
||||
profile_index[row[0]] = json.loads(row[1])
|
||||
except Exception:
|
||||
raw_ingredients = [raw_ingredients]
|
||||
raw_ingredients = [str(i) for i in (raw_ingredients or [])]
|
||||
ingredient_names = extract_ingredient_names(raw_ingredients)
|
||||
pass
|
||||
|
||||
profiles = []
|
||||
for name in ingredient_names:
|
||||
row_p = conn.execute(
|
||||
"SELECT elements FROM ingredient_profiles WHERE name = ?", (name,)
|
||||
).fetchone()
|
||||
if row_p:
|
||||
profiles.append({"elements": json.loads(row_p[0])})
|
||||
coverage = compute_element_coverage(profiles)
|
||||
df = pd.read_parquet(recipes_path)
|
||||
inserted = 0
|
||||
batch = []
|
||||
|
||||
directions = row.get("RecipeInstructions", [])
|
||||
if isinstance(directions, str):
|
||||
try:
|
||||
directions = json.loads(directions)
|
||||
except Exception:
|
||||
directions = [directions]
|
||||
for _, row in df.iterrows():
|
||||
raw_ingredients = row.get("RecipeIngredientParts", [])
|
||||
if isinstance(raw_ingredients, str):
|
||||
try:
|
||||
raw_ingredients = json.loads(raw_ingredients)
|
||||
except Exception:
|
||||
raw_ingredients = [raw_ingredients]
|
||||
raw_ingredients = [str(i) for i in (raw_ingredients or [])]
|
||||
ingredient_names = extract_ingredient_names(raw_ingredients)
|
||||
|
||||
batch.append((
|
||||
str(row.get("RecipeId", "")),
|
||||
str(row.get("Name", ""))[:500],
|
||||
json.dumps(raw_ingredients),
|
||||
json.dumps(ingredient_names),
|
||||
json.dumps([str(d) for d in (directions or [])]),
|
||||
str(row.get("RecipeCategory", "") or ""),
|
||||
json.dumps(list(row.get("Keywords", []) or [])),
|
||||
float(row.get("Calories") or 0) or None,
|
||||
float(row.get("FatContent") or 0) or None,
|
||||
float(row.get("ProteinContent") or 0) or None,
|
||||
float(row.get("SodiumContent") or 0) or None,
|
||||
json.dumps(coverage),
|
||||
))
|
||||
profiles = []
|
||||
for name in ingredient_names:
|
||||
if name in profile_index:
|
||||
profiles.append({"elements": profile_index[name]})
|
||||
coverage = compute_element_coverage(profiles)
|
||||
|
||||
if len(batch) >= batch_size:
|
||||
directions = row.get("RecipeInstructions", [])
|
||||
if isinstance(directions, str):
|
||||
try:
|
||||
directions = json.loads(directions)
|
||||
except Exception:
|
||||
directions = [directions]
|
||||
|
||||
batch.append((
|
||||
str(row.get("RecipeId", "")),
|
||||
str(row.get("Name", ""))[:500],
|
||||
json.dumps(raw_ingredients),
|
||||
json.dumps(ingredient_names),
|
||||
json.dumps([str(d) for d in (directions or [])]),
|
||||
str(row.get("RecipeCategory", "") or ""),
|
||||
json.dumps(list(row.get("Keywords", []) or [])),
|
||||
float(row.get("Calories") or 0) or None,
|
||||
float(row.get("FatContent") or 0) or None,
|
||||
float(row.get("ProteinContent") or 0) or None,
|
||||
float(row.get("SodiumContent") or 0) or None,
|
||||
json.dumps(coverage),
|
||||
))
|
||||
|
||||
if len(batch) >= batch_size:
|
||||
before = conn.total_changes
|
||||
conn.executemany("""
|
||||
INSERT OR IGNORE INTO recipes
|
||||
(external_id, title, ingredients, ingredient_names, directions,
|
||||
category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage)
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
""", batch)
|
||||
conn.commit()
|
||||
inserted += conn.total_changes - before
|
||||
print(f" {inserted} recipes inserted...")
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
before = conn.total_changes
|
||||
conn.executemany("""
|
||||
INSERT OR IGNORE INTO recipes
|
||||
(external_id, title, ingredients, ingredient_names, directions,
|
||||
|
|
@ -109,21 +130,11 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
|
|||
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
""", batch)
|
||||
conn.commit()
|
||||
inserted += len(batch)
|
||||
print(f" {inserted} recipes inserted...")
|
||||
batch = []
|
||||
inserted += conn.total_changes - before
|
||||
|
||||
if batch:
|
||||
conn.executemany("""
|
||||
INSERT OR IGNORE INTO recipes
|
||||
(external_id, title, ingredients, ingredient_names, directions,
|
||||
category, keywords, calories, fat_g, protein_g, sodium_mg, element_coverage)
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
""", batch)
|
||||
conn.commit()
|
||||
inserted += len(batch)
|
||||
|
||||
conn.close()
|
||||
finally:
|
||||
conn.close()
|
||||
print(f"Total: {inserted} recipes inserted")
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -5,8 +5,7 @@ GPL-3.0 source -- derived annotations only, raw pairs not shipped.
|
|||
Usage:
|
||||
conda run -n job-seeker python scripts/pipeline/derive_substitutions.py \
|
||||
--db /path/to/kiwi.db \
|
||||
--recipepairs data/recipepairs.parquet \
|
||||
--recipes data/recipes_foodcom.parquet
|
||||
--recipepairs data/recipepairs.parquet
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import argparse
|
||||
|
|
@ -31,72 +30,73 @@ def diff_ingredients(base: list[str], target: list[str]) -> tuple[list[str], lis
|
|||
return removed, added
|
||||
|
||||
|
||||
def build(db_path: Path, recipepairs_path: Path, recipes_path: Path) -> None:
|
||||
def build(db_path: Path, recipepairs_path: Path) -> None:
|
||||
conn = sqlite3.connect(db_path)
|
||||
try:
|
||||
print("Loading recipe ingredient index...")
|
||||
recipe_ingredients: dict[str, list[str]] = {}
|
||||
for row in conn.execute("SELECT external_id, ingredient_names FROM recipes"):
|
||||
recipe_ingredients[str(row[0])] = json.loads(row[1])
|
||||
|
||||
print("Loading recipe ingredient index...")
|
||||
recipe_ingredients: dict[str, list[str]] = {}
|
||||
for row in conn.execute("SELECT external_id, ingredient_names FROM recipes"):
|
||||
recipe_ingredients[str(row[0])] = json.loads(row[1])
|
||||
df = pd.read_parquet(recipepairs_path)
|
||||
pair_counts: dict[tuple, dict] = defaultdict(lambda: {"count": 0})
|
||||
|
||||
df = pd.read_parquet(recipepairs_path)
|
||||
pair_counts: dict[tuple, dict] = defaultdict(lambda: {"count": 0})
|
||||
print("Diffing recipe pairs...")
|
||||
for _, row in df.iterrows():
|
||||
base_id = str(row.get("base", ""))
|
||||
target_id = str(row.get("target", ""))
|
||||
base_ings = recipe_ingredients.get(base_id, [])
|
||||
target_ings = recipe_ingredients.get(target_id, [])
|
||||
if not base_ings or not target_ings:
|
||||
continue
|
||||
|
||||
print("Diffing recipe pairs...")
|
||||
for _, row in df.iterrows():
|
||||
base_id = str(row.get("base", ""))
|
||||
target_id = str(row.get("target", ""))
|
||||
base_ings = recipe_ingredients.get(base_id, [])
|
||||
target_ings = recipe_ingredients.get(target_id, [])
|
||||
if not base_ings or not target_ings:
|
||||
continue
|
||||
removed, added = diff_ingredients(base_ings, target_ings)
|
||||
if len(removed) != 1 or len(added) != 1:
|
||||
continue
|
||||
|
||||
removed, added = diff_ingredients(base_ings, target_ings)
|
||||
if len(removed) != 1 or len(added) != 1:
|
||||
continue
|
||||
original = removed[0]
|
||||
substitute = added[0]
|
||||
constraints = [c for c in CONSTRAINT_COLS if row.get(c, 0)]
|
||||
for constraint in constraints:
|
||||
key = (original, substitute, constraint)
|
||||
pair_counts[key]["count"] += 1
|
||||
|
||||
original = removed[0]
|
||||
substitute = added[0]
|
||||
constraints = [c for c in CONSTRAINT_COLS if row.get(c, 0)]
|
||||
for constraint in constraints:
|
||||
key = (original, substitute, constraint)
|
||||
pair_counts[key]["count"] += 1
|
||||
def get_profile(name: str) -> dict:
|
||||
row = conn.execute(
|
||||
"SELECT fat_pct, moisture_pct, glutamate_mg, protein_pct "
|
||||
"FROM ingredient_profiles WHERE name = ?", (name,)
|
||||
).fetchone()
|
||||
if row:
|
||||
return {"fat": row[0] or 0, "moisture": row[1] or 0,
|
||||
"glutamate": row[2] or 0, "protein": row[3] or 0}
|
||||
return {"fat": 0, "moisture": 0, "glutamate": 0, "protein": 0}
|
||||
|
||||
def get_profile(name: str) -> dict:
|
||||
row = conn.execute(
|
||||
"SELECT fat_pct, moisture_pct, glutamate_mg, protein_pct "
|
||||
"FROM ingredient_profiles WHERE name = ?", (name,)
|
||||
).fetchone()
|
||||
if row:
|
||||
return {"fat": row[0] or 0, "moisture": row[1] or 0,
|
||||
"glutamate": row[2] or 0, "protein": row[3] or 0}
|
||||
return {"fat": 0, "moisture": 0, "glutamate": 0, "protein": 0}
|
||||
print("Writing substitution pairs...")
|
||||
inserted = 0
|
||||
for (original, substitute, constraint), data in pair_counts.items():
|
||||
if data["count"] < 3:
|
||||
continue
|
||||
p_orig = get_profile(original)
|
||||
p_sub = get_profile(substitute)
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO substitution_pairs
|
||||
(original_name, substitute_name, constraint_label,
|
||||
fat_delta, moisture_delta, glutamate_delta, protein_delta,
|
||||
occurrence_count, source)
|
||||
VALUES (?,?,?,?,?,?,?,?,?)
|
||||
""", (
|
||||
original, substitute, constraint,
|
||||
round(p_sub["fat"] - p_orig["fat"], 2),
|
||||
round(p_sub["moisture"] - p_orig["moisture"], 2),
|
||||
round(p_sub["glutamate"] - p_orig["glutamate"], 2),
|
||||
round(p_sub["protein"] - p_orig["protein"], 2),
|
||||
data["count"], "derived",
|
||||
))
|
||||
inserted += 1
|
||||
|
||||
print("Writing substitution pairs...")
|
||||
inserted = 0
|
||||
for (original, substitute, constraint), data in pair_counts.items():
|
||||
if data["count"] < 3:
|
||||
continue
|
||||
p_orig = get_profile(original)
|
||||
p_sub = get_profile(substitute)
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO substitution_pairs
|
||||
(original_name, substitute_name, constraint_label,
|
||||
fat_delta, moisture_delta, glutamate_delta, protein_delta,
|
||||
occurrence_count, source)
|
||||
VALUES (?,?,?,?,?,?,?,?,?)
|
||||
""", (
|
||||
original, substitute, constraint,
|
||||
round(p_sub["fat"] - p_orig["fat"], 2),
|
||||
round(p_sub["moisture"] - p_orig["moisture"], 2),
|
||||
round(p_sub["glutamate"] - p_orig["glutamate"], 2),
|
||||
round(p_sub["protein"] - p_orig["protein"], 2),
|
||||
data["count"], "derived",
|
||||
))
|
||||
inserted += 1
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
print(f"Inserted {inserted} substitution pairs (min 3 occurrences)")
|
||||
|
||||
|
||||
|
|
@ -104,6 +104,5 @@ if __name__ == "__main__":
|
|||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--db", required=True, type=Path)
|
||||
parser.add_argument("--recipepairs", required=True, type=Path)
|
||||
parser.add_argument("--recipes", required=True, type=Path)
|
||||
args = parser.parse_args()
|
||||
build(args.db, args.recipepairs, args.recipes)
|
||||
build(args.db, args.recipepairs)
|
||||
|
|
|
|||
Loading…
Reference in a new issue