From 0c200f3148a10a506e36f467e5c0b4336848ee54 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 21 May 2026 16:43:23 -0700 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20ingest=5Fpurplecarrot.py=20?= =?UTF-8?q?=E2=80=94=20upsert=20scraped=20recipes=20into=20corpus=20DB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Maps Purple Carrot parquet columns to recipes table schema: Slug → external_id (pc_), Name → title, RecipeIngredientParts/RecipeInstructions → ingredients/directions - Sets source='purplecarrot', category='meal-kit', servings=2 - Allergens encoded as allergen: keywords alongside HIGH-PROTEIN etc. - Handles numpy ndarray columns from parquet (not plain Python lists) - Upserts: insert new, update existing — safe to run repeatedly Wire step 3 (ingest) into weekly_harvest.sh so the full pipeline is: 1. discover_current_menu.py → parquet of active menu slugs 2. scrape_live.py --resume → scrape only new slugs, append to live parquet 3. ingest_purplecarrot.py → upsert into /Library/Assets/kiwi/kiwi.db --- scripts/pipeline/ingest_purplecarrot.py | 218 ++++++++++++++++++ .../pipeline/purple_carrot/weekly_harvest.sh | 6 + 2 files changed, 224 insertions(+) create mode 100644 scripts/pipeline/ingest_purplecarrot.py diff --git a/scripts/pipeline/ingest_purplecarrot.py b/scripts/pipeline/ingest_purplecarrot.py new file mode 100644 index 0000000..dc83265 --- /dev/null +++ b/scripts/pipeline/ingest_purplecarrot.py @@ -0,0 +1,218 @@ +"""Ingest Purple Carrot scraped recipes into the Kiwi corpus database. + +Reads recipes_purplecarrot_live.parquet (output of scrape_live.py) and +upserts into the shared recipes table, setting source='purplecarrot' and +using the recipe slug as the external_id (prefixed pc_). + +Run after each weekly_harvest.sh scrape: + + conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \ + [--db /Library/Assets/kiwi/kiwi.db] \ + [--parquet /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet] +""" + +from __future__ import annotations + +import argparse +import json +import sqlite3 +from pathlib import Path + +import math +import re + +import pandas as pd + +# ── Helpers (inlined from build_recipe_index to avoid cross-module import) ───── + +_MEASURE_PATTERN = re.compile( + r"^\d[\d\s/¼½¾⅓⅔]*\s*(cup|tbsp|tsp|oz|lb|g|kg|ml|l|clove|slice|piece|can|pkg|package|bunch|head|stalk|sprig|pinch|dash|to taste|as needed)s?\b", + re.IGNORECASE, +) +_LEAD_NUMBER = re.compile(r"^\d[\d\s/¼½¾⅓⅔]*\s*") +_TRAILING_QUALIFIER = re.compile( + r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$", + re.IGNORECASE, +) + + +def _float_or_none(val: object) -> float | None: + try: + v = float(val) # type: ignore[arg-type] + return v if v > 0 else None + except (TypeError, ValueError): + return None + + +def _safe_list(val: object) -> list: + if val is None: + return [] + if isinstance(val, float) and math.isnan(val): + return [] + if isinstance(val, list): + return val + # Parquet often deserializes list columns as numpy arrays + try: + import numpy as np + if isinstance(val, np.ndarray): + return val.tolist() + except ImportError: + pass + return [] + + +def _extract_ingredient_names(raw_list: list[str]) -> list[str]: + names = [] + for raw in raw_list: + s = raw.lower().strip() + s = _MEASURE_PATTERN.sub("", s) + s = _LEAD_NUMBER.sub("", s) + s = re.sub(r"\(.*?\)", "", s) + s = re.sub(r",.*$", "", s) + s = _TRAILING_QUALIFIER.sub("", s) + s = s.strip(" -.,") + if s and len(s) > 1: + names.append(s) + return names + + +def _compute_element_coverage(profiles: list[dict]) -> dict[str, float]: + counts: dict[str, int] = {} + for p in profiles: + for elem in p.get("elements", []): + counts[elem] = counts.get(elem, 0) + 1 + if not profiles: + return {} + return {e: round(c / len(profiles), 3) for e, c in counts.items()} + +# ── Config ───────────────────────────────────────────────────────────────────── + +DEFAULT_DB = Path("/Library/Assets/kiwi/kiwi.db") +DEFAULT_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet") + + +# ── Ingest ───────────────────────────────────────────────────────────────────── + +def ingest(db_path: Path, parquet_path: Path) -> None: + df = pd.read_parquet(parquet_path) + + # Filter to rows with full recipe data + if "HasFullRecipe" in df.columns: + df = df[df["HasFullRecipe"] == True].copy() + + if df.empty: + print("No full recipes found in parquet — nothing to ingest.") + return + + print(f"Ingesting {len(df)} Purple Carrot recipes into {db_path} …") + + conn = sqlite3.connect(db_path) + try: + conn.execute("PRAGMA journal_mode=WAL") + + # Pre-load ingredient element profiles for coverage calculation + profile_index: dict[str, list[str]] = {} + for row in conn.execute("SELECT name, elements FROM ingredient_profiles"): + try: + profile_index[row[0]] = json.loads(row[1]) + except Exception: + pass + + inserted = updated = 0 + + for _, row in df.iterrows(): + slug = str(row.get("Slug", "")).strip() + if not slug: + continue + + external_id = f"pc_{slug}" + title = str(row.get("Name", "")).strip()[:500] + if not title: + continue + + raw_ingredients = [str(i) for i in _safe_list(row.get("RecipeIngredientParts", []))] + directions = [str(d) for d in _safe_list(row.get("RecipeInstructions", []))] + + ingredient_names = _extract_ingredient_names(raw_ingredients) + profiles = [ + {"elements": profile_index[n]} + for n in ingredient_names if n in profile_index + ] + coverage = _compute_element_coverage(profiles) + + # Keywords: merge scraped tags with allergen info + kw_raw = _safe_list(row.get("Keywords", [])) + allergens = str(row.get("Allergens", "") or "") + if allergens: + kw_raw = list(kw_raw) + [f"allergen:{a.strip()}" for a in allergens.split(",") if a.strip()] + keywords_json = json.dumps(kw_raw) + + # Check if already present (same external_id) + existing = conn.execute( + "SELECT id FROM recipes WHERE external_id = ?", (external_id,) + ).fetchone() + + params = ( + title, + json.dumps(raw_ingredients), + json.dumps(ingredient_names), + json.dumps(directions), + "meal-kit", # category + keywords_json, + _float_or_none(row.get("Calories")), + _float_or_none(row.get("FatContent")), + _float_or_none(row.get("ProteinContent")), + None, # sodium_mg — not scraped + json.dumps(coverage), + None, # sugar_g — not scraped + _float_or_none(row.get("CarbohydrateContent")), + _float_or_none(row.get("FiberContent")), + 2.0, # servings — PC meal kits are 2-serving by default + 0, # nutrition_estimated — PC provides real data + ) + + if existing: + conn.execute(""" + UPDATE recipes + SET title=?, ingredients=?, ingredient_names=?, directions=?, + category=?, keywords=?, calories=?, fat_g=?, protein_g=?, + sodium_mg=?, element_coverage=?, + sugar_g=?, carbs_g=?, fiber_g=?, servings=?, nutrition_estimated=? + WHERE external_id=? + """, params + (external_id,)) + updated += 1 + else: + conn.execute(""" + INSERT INTO recipes + (external_id, source, title, ingredients, ingredient_names, + directions, category, keywords, calories, fat_g, protein_g, + sodium_mg, element_coverage, + sugar_g, carbs_g, fiber_g, servings, nutrition_estimated) + VALUES (?, 'purplecarrot', ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) + """, (external_id,) + params) + inserted += 1 + + conn.commit() + finally: + conn.close() + + print(f"Done — {inserted} inserted, {updated} updated") + + +# ── Main ─────────────────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--db", type=Path, default=DEFAULT_DB) + parser.add_argument("--parquet", type=Path, default=DEFAULT_PARQUET) + args = parser.parse_args() + + if not args.parquet.exists(): + print(f"ERROR: parquet not found at {args.parquet}") + raise SystemExit(1) + + ingest(args.db, args.parquet) + + +if __name__ == "__main__": + main() diff --git a/scripts/pipeline/purple_carrot/weekly_harvest.sh b/scripts/pipeline/purple_carrot/weekly_harvest.sh index 74bf6c8..9b45937 100755 --- a/scripts/pipeline/purple_carrot/weekly_harvest.sh +++ b/scripts/pipeline/purple_carrot/weekly_harvest.sh @@ -31,5 +31,11 @@ conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \ --resume \ --delay 3.0 2>&1 | tee -a "$LOG" +# Step 3: ingest new recipes into the shared corpus DB +echo "[3/3] Ingesting into corpus DB..." | tee -a "$LOG" +conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \ + --parquet "$LIVE_OUT" \ + --db /Library/Assets/kiwi/kiwi.db 2>&1 | tee -a "$LOG" + echo "=== Done $(date -u '+%Y-%m-%d %H:%M UTC') ===" >> "$LOG" echo "" >> "$LOG"