- Maps Purple Carrot parquet columns to recipes table schema: Slug → external_id (pc_<slug>), Name → title, RecipeIngredientParts/RecipeInstructions → ingredients/directions - Sets source='purplecarrot', category='meal-kit', servings=2 - Allergens encoded as allergen:<tag> keywords alongside HIGH-PROTEIN etc. - Handles numpy ndarray columns from parquet (not plain Python lists) - Upserts: insert new, update existing — safe to run repeatedly Wire step 3 (ingest) into weekly_harvest.sh so the full pipeline is: 1. discover_current_menu.py → parquet of active menu slugs 2. scrape_live.py --resume → scrape only new slugs, append to live parquet 3. ingest_purplecarrot.py → upsert into /Library/Assets/kiwi/kiwi.db
218 lines
7.9 KiB
Python
218 lines
7.9 KiB
Python
"""Ingest Purple Carrot scraped recipes into the Kiwi corpus database.
|
|
|
|
Reads recipes_purplecarrot_live.parquet (output of scrape_live.py) and
|
|
upserts into the shared recipes table, setting source='purplecarrot' and
|
|
using the recipe slug as the external_id (prefixed pc_).
|
|
|
|
Run after each weekly_harvest.sh scrape:
|
|
|
|
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
|
|
[--db /Library/Assets/kiwi/kiwi.db] \
|
|
[--parquet /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet]
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
import math
|
|
import re
|
|
|
|
import pandas as pd
|
|
|
|
# ── Helpers (inlined from build_recipe_index to avoid cross-module import) ─────
|
|
|
|
_MEASURE_PATTERN = re.compile(
|
|
r"^\d[\d\s/¼½¾⅓⅔]*\s*(cup|tbsp|tsp|oz|lb|g|kg|ml|l|clove|slice|piece|can|pkg|package|bunch|head|stalk|sprig|pinch|dash|to taste|as needed)s?\b",
|
|
re.IGNORECASE,
|
|
)
|
|
_LEAD_NUMBER = re.compile(r"^\d[\d\s/¼½¾⅓⅔]*\s*")
|
|
_TRAILING_QUALIFIER = re.compile(
|
|
r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _float_or_none(val: object) -> float | None:
|
|
try:
|
|
v = float(val) # type: ignore[arg-type]
|
|
return v if v > 0 else None
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _safe_list(val: object) -> list:
|
|
if val is None:
|
|
return []
|
|
if isinstance(val, float) and math.isnan(val):
|
|
return []
|
|
if isinstance(val, list):
|
|
return val
|
|
# Parquet often deserializes list columns as numpy arrays
|
|
try:
|
|
import numpy as np
|
|
if isinstance(val, np.ndarray):
|
|
return val.tolist()
|
|
except ImportError:
|
|
pass
|
|
return []
|
|
|
|
|
|
def _extract_ingredient_names(raw_list: list[str]) -> list[str]:
|
|
names = []
|
|
for raw in raw_list:
|
|
s = raw.lower().strip()
|
|
s = _MEASURE_PATTERN.sub("", s)
|
|
s = _LEAD_NUMBER.sub("", s)
|
|
s = re.sub(r"\(.*?\)", "", s)
|
|
s = re.sub(r",.*$", "", s)
|
|
s = _TRAILING_QUALIFIER.sub("", s)
|
|
s = s.strip(" -.,")
|
|
if s and len(s) > 1:
|
|
names.append(s)
|
|
return names
|
|
|
|
|
|
def _compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
|
|
counts: dict[str, int] = {}
|
|
for p in profiles:
|
|
for elem in p.get("elements", []):
|
|
counts[elem] = counts.get(elem, 0) + 1
|
|
if not profiles:
|
|
return {}
|
|
return {e: round(c / len(profiles), 3) for e, c in counts.items()}
|
|
|
|
# ── Config ─────────────────────────────────────────────────────────────────────
|
|
|
|
DEFAULT_DB = Path("/Library/Assets/kiwi/kiwi.db")
|
|
DEFAULT_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
|
|
|
|
|
|
# ── Ingest ─────────────────────────────────────────────────────────────────────
|
|
|
|
def ingest(db_path: Path, parquet_path: Path) -> None:
|
|
df = pd.read_parquet(parquet_path)
|
|
|
|
# Filter to rows with full recipe data
|
|
if "HasFullRecipe" in df.columns:
|
|
df = df[df["HasFullRecipe"] == True].copy()
|
|
|
|
if df.empty:
|
|
print("No full recipes found in parquet — nothing to ingest.")
|
|
return
|
|
|
|
print(f"Ingesting {len(df)} Purple Carrot recipes into {db_path} …")
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
try:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
|
|
# Pre-load ingredient element profiles for coverage calculation
|
|
profile_index: dict[str, list[str]] = {}
|
|
for row in conn.execute("SELECT name, elements FROM ingredient_profiles"):
|
|
try:
|
|
profile_index[row[0]] = json.loads(row[1])
|
|
except Exception:
|
|
pass
|
|
|
|
inserted = updated = 0
|
|
|
|
for _, row in df.iterrows():
|
|
slug = str(row.get("Slug", "")).strip()
|
|
if not slug:
|
|
continue
|
|
|
|
external_id = f"pc_{slug}"
|
|
title = str(row.get("Name", "")).strip()[:500]
|
|
if not title:
|
|
continue
|
|
|
|
raw_ingredients = [str(i) for i in _safe_list(row.get("RecipeIngredientParts", []))]
|
|
directions = [str(d) for d in _safe_list(row.get("RecipeInstructions", []))]
|
|
|
|
ingredient_names = _extract_ingredient_names(raw_ingredients)
|
|
profiles = [
|
|
{"elements": profile_index[n]}
|
|
for n in ingredient_names if n in profile_index
|
|
]
|
|
coverage = _compute_element_coverage(profiles)
|
|
|
|
# Keywords: merge scraped tags with allergen info
|
|
kw_raw = _safe_list(row.get("Keywords", []))
|
|
allergens = str(row.get("Allergens", "") or "")
|
|
if allergens:
|
|
kw_raw = list(kw_raw) + [f"allergen:{a.strip()}" for a in allergens.split(",") if a.strip()]
|
|
keywords_json = json.dumps(kw_raw)
|
|
|
|
# Check if already present (same external_id)
|
|
existing = conn.execute(
|
|
"SELECT id FROM recipes WHERE external_id = ?", (external_id,)
|
|
).fetchone()
|
|
|
|
params = (
|
|
title,
|
|
json.dumps(raw_ingredients),
|
|
json.dumps(ingredient_names),
|
|
json.dumps(directions),
|
|
"meal-kit", # category
|
|
keywords_json,
|
|
_float_or_none(row.get("Calories")),
|
|
_float_or_none(row.get("FatContent")),
|
|
_float_or_none(row.get("ProteinContent")),
|
|
None, # sodium_mg — not scraped
|
|
json.dumps(coverage),
|
|
None, # sugar_g — not scraped
|
|
_float_or_none(row.get("CarbohydrateContent")),
|
|
_float_or_none(row.get("FiberContent")),
|
|
2.0, # servings — PC meal kits are 2-serving by default
|
|
0, # nutrition_estimated — PC provides real data
|
|
)
|
|
|
|
if existing:
|
|
conn.execute("""
|
|
UPDATE recipes
|
|
SET title=?, ingredients=?, ingredient_names=?, directions=?,
|
|
category=?, keywords=?, calories=?, fat_g=?, protein_g=?,
|
|
sodium_mg=?, element_coverage=?,
|
|
sugar_g=?, carbs_g=?, fiber_g=?, servings=?, nutrition_estimated=?
|
|
WHERE external_id=?
|
|
""", params + (external_id,))
|
|
updated += 1
|
|
else:
|
|
conn.execute("""
|
|
INSERT INTO recipes
|
|
(external_id, source, title, ingredients, ingredient_names,
|
|
directions, category, keywords, calories, fat_g, protein_g,
|
|
sodium_mg, element_coverage,
|
|
sugar_g, carbs_g, fiber_g, servings, nutrition_estimated)
|
|
VALUES (?, 'purplecarrot', ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
|
""", (external_id,) + params)
|
|
inserted += 1
|
|
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
print(f"Done — {inserted} inserted, {updated} updated")
|
|
|
|
|
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
|
|
parser.add_argument("--parquet", type=Path, default=DEFAULT_PARQUET)
|
|
args = parser.parse_args()
|
|
|
|
if not args.parquet.exists():
|
|
print(f"ERROR: parquet not found at {args.parquet}")
|
|
raise SystemExit(1)
|
|
|
|
ingest(args.db, args.parquet)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|