fix: pipeline scripts — connection safety, remove unused recipes_path arg, fix inserted counter, pre-load profile index

This commit is contained in:
pyr0ball 2026-03-30 23:10:52 -07:00
parent ba6766b1d9
commit 0e1eae9a90
2 changed files with 125 additions and 115 deletions

View file

@ -55,7 +55,17 @@ def compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None: def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
try:
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
# Pre-load ingredient element profiles to avoid N+1 queries
profile_index: dict[str, list[str]] = {}
for row in conn.execute("SELECT name, elements FROM ingredient_profiles"):
try:
profile_index[row[0]] = json.loads(row[1])
except Exception:
pass
df = pd.read_parquet(recipes_path) df = pd.read_parquet(recipes_path)
inserted = 0 inserted = 0
batch = [] batch = []
@ -72,11 +82,8 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
profiles = [] profiles = []
for name in ingredient_names: for name in ingredient_names:
row_p = conn.execute( if name in profile_index:
"SELECT elements FROM ingredient_profiles WHERE name = ?", (name,) profiles.append({"elements": profile_index[name]})
).fetchone()
if row_p:
profiles.append({"elements": json.loads(row_p[0])})
coverage = compute_element_coverage(profiles) coverage = compute_element_coverage(profiles)
directions = row.get("RecipeInstructions", []) directions = row.get("RecipeInstructions", [])
@ -102,6 +109,7 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
)) ))
if len(batch) >= batch_size: if len(batch) >= batch_size:
before = conn.total_changes
conn.executemany(""" conn.executemany("""
INSERT OR IGNORE INTO recipes INSERT OR IGNORE INTO recipes
(external_id, title, ingredients, ingredient_names, directions, (external_id, title, ingredients, ingredient_names, directions,
@ -109,11 +117,12 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
VALUES (?,?,?,?,?,?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""", batch) """, batch)
conn.commit() conn.commit()
inserted += len(batch) inserted += conn.total_changes - before
print(f" {inserted} recipes inserted...") print(f" {inserted} recipes inserted...")
batch = [] batch = []
if batch: if batch:
before = conn.total_changes
conn.executemany(""" conn.executemany("""
INSERT OR IGNORE INTO recipes INSERT OR IGNORE INTO recipes
(external_id, title, ingredients, ingredient_names, directions, (external_id, title, ingredients, ingredient_names, directions,
@ -121,8 +130,10 @@ def build(db_path: Path, recipes_path: Path, batch_size: int = 10000) -> None:
VALUES (?,?,?,?,?,?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""", batch) """, batch)
conn.commit() conn.commit()
inserted += len(batch) inserted += conn.total_changes - before
conn.commit()
finally:
conn.close() conn.close()
print(f"Total: {inserted} recipes inserted") print(f"Total: {inserted} recipes inserted")

View file

@ -5,8 +5,7 @@ GPL-3.0 source -- derived annotations only, raw pairs not shipped.
Usage: Usage:
conda run -n job-seeker python scripts/pipeline/derive_substitutions.py \ conda run -n job-seeker python scripts/pipeline/derive_substitutions.py \
--db /path/to/kiwi.db \ --db /path/to/kiwi.db \
--recipepairs data/recipepairs.parquet \ --recipepairs data/recipepairs.parquet
--recipes data/recipes_foodcom.parquet
""" """
from __future__ import annotations from __future__ import annotations
import argparse import argparse
@ -31,9 +30,9 @@ def diff_ingredients(base: list[str], target: list[str]) -> tuple[list[str], lis
return removed, added return removed, added
def build(db_path: Path, recipepairs_path: Path, recipes_path: Path) -> None: def build(db_path: Path, recipepairs_path: Path) -> None:
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
try:
print("Loading recipe ingredient index...") print("Loading recipe ingredient index...")
recipe_ingredients: dict[str, list[str]] = {} recipe_ingredients: dict[str, list[str]] = {}
for row in conn.execute("SELECT external_id, ingredient_names FROM recipes"): for row in conn.execute("SELECT external_id, ingredient_names FROM recipes"):
@ -96,6 +95,7 @@ def build(db_path: Path, recipepairs_path: Path, recipes_path: Path) -> None:
inserted += 1 inserted += 1
conn.commit() conn.commit()
finally:
conn.close() conn.close()
print(f"Inserted {inserted} substitution pairs (min 3 occurrences)") print(f"Inserted {inserted} substitution pairs (min 3 occurrences)")
@ -104,6 +104,5 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--db", required=True, type=Path) parser.add_argument("--db", required=True, type=Path)
parser.add_argument("--recipepairs", required=True, type=Path) parser.add_argument("--recipepairs", required=True, type=Path)
parser.add_argument("--recipes", required=True, type=Path)
args = parser.parse_args() args = parser.parse_args()
build(args.db, args.recipepairs, args.recipes) build(args.db, args.recipepairs)