feat(pipeline): ingest_purplecarrot.py — upsert scraped recipes into corpus DB
Some checks failed
CI / Backend (Python) (push) Has been cancelled
CI / Frontend (Vue) (push) Has been cancelled
Mirror / mirror (push) Has been cancelled

- Maps Purple Carrot parquet columns to recipes table schema:
  Slug → external_id (pc_<slug>), Name → title,
  RecipeIngredientParts/RecipeInstructions → ingredients/directions
- Sets source='purplecarrot', category='meal-kit', servings=2
- Allergens encoded as allergen:<tag> keywords alongside HIGH-PROTEIN etc.
- Handles numpy ndarray columns from parquet (not plain Python lists)
- Upserts: insert new, update existing — safe to run repeatedly

Wire step 3 (ingest) into weekly_harvest.sh so the full pipeline is:
  1. discover_current_menu.py → parquet of active menu slugs
  2. scrape_live.py --resume  → scrape only new slugs, append to live parquet
  3. ingest_purplecarrot.py   → upsert into /Library/Assets/kiwi/kiwi.db
This commit is contained in:
pyr0ball 2026-05-21 16:43:23 -07:00
parent 21a0664961
commit 0c200f3148
2 changed files with 224 additions and 0 deletions

View file

@ -0,0 +1,218 @@
"""Ingest Purple Carrot scraped recipes into the Kiwi corpus database.
Reads recipes_purplecarrot_live.parquet (output of scrape_live.py) and
upserts into the shared recipes table, setting source='purplecarrot' and
using the recipe slug as the external_id (prefixed pc_).
Run after each weekly_harvest.sh scrape:
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
[--db /Library/Assets/kiwi/kiwi.db] \
[--parquet /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet]
"""
from __future__ import annotations
import argparse
import json
import sqlite3
from pathlib import Path
import math
import re
import pandas as pd
# ── Helpers (inlined from build_recipe_index to avoid cross-module import) ─────
_MEASURE_PATTERN = re.compile(
r"^\d[\d\s/¼½¾⅓⅔]*\s*(cup|tbsp|tsp|oz|lb|g|kg|ml|l|clove|slice|piece|can|pkg|package|bunch|head|stalk|sprig|pinch|dash|to taste|as needed)s?\b",
re.IGNORECASE,
)
_LEAD_NUMBER = re.compile(r"^\d[\d\s/¼½¾⅓⅔]*\s*")
_TRAILING_QUALIFIER = re.compile(
r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$",
re.IGNORECASE,
)
def _float_or_none(val: object) -> float | None:
try:
v = float(val) # type: ignore[arg-type]
return v if v > 0 else None
except (TypeError, ValueError):
return None
def _safe_list(val: object) -> list:
if val is None:
return []
if isinstance(val, float) and math.isnan(val):
return []
if isinstance(val, list):
return val
# Parquet often deserializes list columns as numpy arrays
try:
import numpy as np
if isinstance(val, np.ndarray):
return val.tolist()
except ImportError:
pass
return []
def _extract_ingredient_names(raw_list: list[str]) -> list[str]:
names = []
for raw in raw_list:
s = raw.lower().strip()
s = _MEASURE_PATTERN.sub("", s)
s = _LEAD_NUMBER.sub("", s)
s = re.sub(r"\(.*?\)", "", s)
s = re.sub(r",.*$", "", s)
s = _TRAILING_QUALIFIER.sub("", s)
s = s.strip(" -.,")
if s and len(s) > 1:
names.append(s)
return names
def _compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
counts: dict[str, int] = {}
for p in profiles:
for elem in p.get("elements", []):
counts[elem] = counts.get(elem, 0) + 1
if not profiles:
return {}
return {e: round(c / len(profiles), 3) for e, c in counts.items()}
# ── Config ─────────────────────────────────────────────────────────────────────
DEFAULT_DB = Path("/Library/Assets/kiwi/kiwi.db")
DEFAULT_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
# ── Ingest ─────────────────────────────────────────────────────────────────────
def ingest(db_path: Path, parquet_path: Path) -> None:
df = pd.read_parquet(parquet_path)
# Filter to rows with full recipe data
if "HasFullRecipe" in df.columns:
df = df[df["HasFullRecipe"] == True].copy()
if df.empty:
print("No full recipes found in parquet — nothing to ingest.")
return
print(f"Ingesting {len(df)} Purple Carrot recipes into {db_path}")
conn = sqlite3.connect(db_path)
try:
conn.execute("PRAGMA journal_mode=WAL")
# Pre-load ingredient element profiles for coverage calculation
profile_index: dict[str, list[str]] = {}
for row in conn.execute("SELECT name, elements FROM ingredient_profiles"):
try:
profile_index[row[0]] = json.loads(row[1])
except Exception:
pass
inserted = updated = 0
for _, row in df.iterrows():
slug = str(row.get("Slug", "")).strip()
if not slug:
continue
external_id = f"pc_{slug}"
title = str(row.get("Name", "")).strip()[:500]
if not title:
continue
raw_ingredients = [str(i) for i in _safe_list(row.get("RecipeIngredientParts", []))]
directions = [str(d) for d in _safe_list(row.get("RecipeInstructions", []))]
ingredient_names = _extract_ingredient_names(raw_ingredients)
profiles = [
{"elements": profile_index[n]}
for n in ingredient_names if n in profile_index
]
coverage = _compute_element_coverage(profiles)
# Keywords: merge scraped tags with allergen info
kw_raw = _safe_list(row.get("Keywords", []))
allergens = str(row.get("Allergens", "") or "")
if allergens:
kw_raw = list(kw_raw) + [f"allergen:{a.strip()}" for a in allergens.split(",") if a.strip()]
keywords_json = json.dumps(kw_raw)
# Check if already present (same external_id)
existing = conn.execute(
"SELECT id FROM recipes WHERE external_id = ?", (external_id,)
).fetchone()
params = (
title,
json.dumps(raw_ingredients),
json.dumps(ingredient_names),
json.dumps(directions),
"meal-kit", # category
keywords_json,
_float_or_none(row.get("Calories")),
_float_or_none(row.get("FatContent")),
_float_or_none(row.get("ProteinContent")),
None, # sodium_mg — not scraped
json.dumps(coverage),
None, # sugar_g — not scraped
_float_or_none(row.get("CarbohydrateContent")),
_float_or_none(row.get("FiberContent")),
2.0, # servings — PC meal kits are 2-serving by default
0, # nutrition_estimated — PC provides real data
)
if existing:
conn.execute("""
UPDATE recipes
SET title=?, ingredients=?, ingredient_names=?, directions=?,
category=?, keywords=?, calories=?, fat_g=?, protein_g=?,
sodium_mg=?, element_coverage=?,
sugar_g=?, carbs_g=?, fiber_g=?, servings=?, nutrition_estimated=?
WHERE external_id=?
""", params + (external_id,))
updated += 1
else:
conn.execute("""
INSERT INTO recipes
(external_id, source, title, ingredients, ingredient_names,
directions, category, keywords, calories, fat_g, protein_g,
sodium_mg, element_coverage,
sugar_g, carbs_g, fiber_g, servings, nutrition_estimated)
VALUES (?, 'purplecarrot', ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (external_id,) + params)
inserted += 1
conn.commit()
finally:
conn.close()
print(f"Done — {inserted} inserted, {updated} updated")
# ── Main ───────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
parser.add_argument("--parquet", type=Path, default=DEFAULT_PARQUET)
args = parser.parse_args()
if not args.parquet.exists():
print(f"ERROR: parquet not found at {args.parquet}")
raise SystemExit(1)
ingest(args.db, args.parquet)
if __name__ == "__main__":
main()

View file

@ -31,5 +31,11 @@ conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
--resume \
--delay 3.0 2>&1 | tee -a "$LOG"
# Step 3: ingest new recipes into the shared corpus DB
echo "[3/3] Ingesting into corpus DB..." | tee -a "$LOG"
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
--parquet "$LIVE_OUT" \
--db /Library/Assets/kiwi/kiwi.db 2>&1 | tee -a "$LOG"
echo "=== Done $(date -u '+%Y-%m-%d %H:%M UTC') ===" >> "$LOG"
echo "" >> "$LOG"