From 7cad503b3518f0e9f4258d9b7a0f52f237dc5faf Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Sun, 17 May 2026 09:16:35 -0700 Subject: [PATCH] feat(pipeline): Purple Carrot recipe corpus scraper via Wayback Machine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit discover_wayback.py — enumerates recipe slugs from archived menu API (/api/v2/menus/) and product API (/api/v1/products/*) plus recipe-category HTML pages. Writes incremental JSONL manifest to /Library/Assets/kiwi/pipeline/pc_slugs.jsonl. scrape_recipes.py — fetches full recipe data per slug using three-tier fallback: product API JSON (oldest captures first), HTML inline state (__NEXT_DATA__ / __INITIAL_STATE__), and JSON-LD structured data. Outputs recipes_purplecarrot.parquet in food.com columnar format so build_recipe_index.py imports it unchanged. Includes SourceURL column for recipe attribution UI (kiwi#139). Checkpoints every 50 recipes. Initial discovery: 158 slugs from menu 1536 + product_api pass. Re-run discover_wayback.py after archive.org stabilizes to pick up older slugs from recipe-category pages. Backlog: live Playwright scraper for post-Wayback recipes (kiwi#137). --- scripts/pipeline/purple_carrot/__init__.py | 0 .../purple_carrot/discover_wayback.py | 298 ++++++++++ .../pipeline/purple_carrot/scrape_recipes.py | 529 ++++++++++++++++++ 3 files changed, 827 insertions(+) create mode 100644 scripts/pipeline/purple_carrot/__init__.py create mode 100644 scripts/pipeline/purple_carrot/discover_wayback.py create mode 100644 scripts/pipeline/purple_carrot/scrape_recipes.py diff --git a/scripts/pipeline/purple_carrot/__init__.py b/scripts/pipeline/purple_carrot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/pipeline/purple_carrot/discover_wayback.py b/scripts/pipeline/purple_carrot/discover_wayback.py new file mode 100644 index 0000000..1c32bf1 --- /dev/null +++ b/scripts/pipeline/purple_carrot/discover_wayback.py @@ -0,0 +1,298 @@ +""" +discover_wayback.py — enumerate Purple Carrot recipe slugs via the Wayback Machine. + +Strategy: + 1. CDX API → all archived /api/v2/menus/* URLs (multiple timestamps) + 2. Replay → fetch each menu's menuItems, extract productPath slugs + 3. CDX API → all archived /api/v1/products/* URLs (direct slug capture) + 4. CDX API → /recipe-categories/* HTML pages for older slugs + 5. Deduplicate and write manifest to OUT_FILE + +Output (JSONL, one record per recipe): + {"slug": "...", "title": "...", "subtitle": "...", "cook_time": "...", + "tags": [...], "serving_size": 2, "image_url": "...", + "wayback_ts": "20260412150557", "source": "menu|product_api|category_page"} + +Usage: + conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback + conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback --out /Library/Assets/kiwi/pipeline/pc_slugs.jsonl +""" +from __future__ import annotations + +import argparse +import json +import logging +import time +from pathlib import Path +from typing import Any +from urllib.parse import urlencode + +import requests + +logger = logging.getLogger(__name__) + +CDX_BASE = "http://web.archive.org/cdx/search/cdx" +WB_BASE = "https://web.archive.org/web" +PC_HOST = "www.purplecarrot.com" + +# Polite delay between Wayback replay fetches (seconds) +REPLAY_DELAY = 1.0 +CDX_DELAY = 0.5 + +DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl") + + +# ── CDX helpers ─────────────────────────────────────────────────────────────── + +def cdx_query(url_pattern: str, **kwargs) -> list[dict]: + """Run a CDX search and return a list of result dicts.""" + params = { + "url": url_pattern, + "output": "json", + "fl": "original,timestamp,statuscode", + "collapse": "urlkey", + "filter": "statuscode:200", + **kwargs, + } + for attempt in range(3): + try: + resp = requests.get(CDX_BASE, params=params, timeout=30) + resp.raise_for_status() + rows = resp.json() + if not rows or len(rows) < 2: + return [] + headers = rows[0] + return [dict(zip(headers, row)) for row in rows[1:]] + except Exception as exc: + logger.warning("CDX attempt %d failed: %s", attempt + 1, exc) + time.sleep(2 ** attempt) + return [] + + +def wayback_get(url: str, timestamp: str) -> Any | None: + """Fetch a Wayback replay of a URL and return parsed JSON (or None).""" + replay_url = f"{WB_BASE}/{timestamp}/{url}" + for attempt in range(3): + try: + resp = requests.get(replay_url, timeout=30) + if resp.status_code == 200: + return resp.json() + if resp.status_code == 404: + return None + except Exception as exc: + logger.warning("Wayback GET attempt %d failed for %s: %s", attempt + 1, url, exc) + time.sleep(2 ** attempt) + return None + + +# ── Slug extraction ─────────────────────────────────────────────────────────── + +def slug_from_product_path(path: str) -> str | None: + """'/recipe/foo-bar-baz' → 'foo-bar-baz'.""" + if not path: + return None + return path.strip("/").split("/")[-1] or None + + +def _menu_item_to_record(item: dict, wayback_ts: str) -> dict | None: + slug = slug_from_product_path(item.get("productPath", "")) + if not slug: + return None + return { + "slug": slug, + "title": item.get("title", ""), + "subtitle": item.get("subtitle", ""), + "cook_time": item.get("cookTime", ""), + "tags": item.get("filterTags") or [], + "serving_size": item.get("servingSize"), + "image_url": item.get("imageURL", ""), + "description": item.get("description", ""), + "wayback_ts": wayback_ts, + "source": "menu", + } + + +# ── Discovery passes ────────────────────────────────────────────────────────── + +def pass_menus(seen_slugs: set[str]) -> list[dict]: + """Walk all archived /api/v2/menus/* captures to extract slugs.""" + records: list[dict] = [] + + # Find all distinct archived menu URLs + menu_cdx = cdx_query(f"{PC_HOST}/api/v2/menus/*", limit="500") + logger.info("CDX: %d archived menu URLs found", len(menu_cdx)) + time.sleep(CDX_DELAY) + + processed_menu_ids: set[str] = set() + + for entry in menu_cdx: + url = entry["original"] + ts = entry["timestamp"] + + # Skip the listing endpoint, only process individual menus + if not url.split("?")[0].rstrip("/").split("/")[-1].isdigit(): + continue + + menu_id = url.split("?")[0].rstrip("/").split("/")[-1] + if menu_id in processed_menu_ids: + continue + processed_menu_ids.add(menu_id) + + logger.info("Fetching menu %s (ts=%s) ...", menu_id, ts) + data = wayback_get(url.split("?")[0] + "?logged_out=true", ts) + time.sleep(REPLAY_DELAY) + + if not data or "menuItems" not in data: + continue + + for item in data["menuItems"]: + rec = _menu_item_to_record(item, ts) + if rec and rec["slug"] not in seen_slugs: + seen_slugs.add(rec["slug"]) + records.append(rec) + logger.debug(" + %s", rec["slug"]) + + logger.info(" %d new slugs (total so far: %d)", len(records), len(seen_slugs)) + + return records + + +def pass_product_api(seen_slugs: set[str]) -> list[dict]: + """Pick up any directly archived /api/v1/products/* URLs the menu pass missed.""" + records: list[dict] = [] + + product_cdx = cdx_query(f"{PC_HOST}/api/v1/products/*", limit="5000") + logger.info("CDX: %d archived product API URLs found", len(product_cdx)) + time.sleep(CDX_DELAY) + + for entry in product_cdx: + slug = entry["original"].rstrip("/").split("/")[-1] + if not slug or slug in seen_slugs: + continue + seen_slugs.add(slug) + records.append({ + "slug": slug, + "title": "", + "subtitle": "", + "cook_time": "", + "tags": [], + "serving_size": None, + "image_url": "", + "description": "", + "wayback_ts": entry["timestamp"], + "source": "product_api", + }) + + logger.info("product_api pass: %d new slugs", len(records)) + return records + + +def pass_category_pages(seen_slugs: set[str]) -> list[dict]: + """Parse archived recipe-categories HTML pages for slugs not in the API. + + Category pages are rendered SSR/with inline JSON state on older captures, + so we do a simple regex scan for /recipe/ patterns. + """ + import re + + records: list[dict] = [] + SLUG_RE = re.compile(r'["\s]/recipe/([a-z0-9][a-z0-9\-]{3,})["\s/?]') + + cat_cdx = cdx_query(f"{PC_HOST}/recipe-categories/*", limit="200") + logger.info("CDX: %d archived category pages found", len(cat_cdx)) + time.sleep(CDX_DELAY) + + seen_category_urls: set[str] = set() + + for entry in cat_cdx: + url = entry["original"].split("?")[0] + if url in seen_category_urls: + continue + seen_category_urls.add(url) + + replay_url = f"{WB_BASE}/{entry['timestamp']}/{url}" + try: + resp = requests.get(replay_url, timeout=30) + time.sleep(REPLAY_DELAY) + if resp.status_code != 200: + continue + except Exception as exc: + logger.warning("Category page fetch failed: %s", exc) + continue + + for slug in SLUG_RE.findall(resp.text): + if slug in seen_slugs: + continue + seen_slugs.add(slug) + records.append({ + "slug": slug, + "title": "", + "subtitle": "", + "cook_time": "", + "tags": [], + "serving_size": None, + "image_url": "", + "description": "", + "wayback_ts": entry["timestamp"], + "source": "category_page", + }) + + logger.info("category_pages pass: %d new slugs", len(records)) + return records + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def discover(out_file: Path) -> None: + seen: set[str] = set() + + # Load previously discovered slugs so reruns are incremental + existing: list[dict] = [] + if out_file.exists(): + with open(out_file) as f: + for line in f: + line = line.strip() + if line: + rec = json.loads(line) + seen.add(rec["slug"]) + existing.append(rec) + logger.info("Loaded %d existing slugs from %s", len(seen), out_file) + + new_records: list[dict] = [] + new_records += pass_menus(seen) + new_records += pass_product_api(seen) + new_records += pass_category_pages(seen) + + out_file.parent.mkdir(parents=True, exist_ok=True) + with open(out_file, "a") as f: + for rec in new_records: + f.write(json.dumps(rec) + "\n") + + total = len(existing) + len(new_records) + logger.info( + "Done. %d new slugs written to %s (%d total).", + len(new_records), out_file, total, + ) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Discover Purple Carrot recipe slugs via Wayback") + parser.add_argument( + "--out", + type=Path, + default=DEFAULT_OUT, + help=f"Output JSONL manifest (default: {DEFAULT_OUT})", + ) + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + discover(args.out) + + +if __name__ == "__main__": + main() diff --git a/scripts/pipeline/purple_carrot/scrape_recipes.py b/scripts/pipeline/purple_carrot/scrape_recipes.py new file mode 100644 index 0000000..72672d1 --- /dev/null +++ b/scripts/pipeline/purple_carrot/scrape_recipes.py @@ -0,0 +1,529 @@ +""" +scrape_recipes.py — fetch full recipe data for slugs in pc_slugs.jsonl. + +For each slug: + 1. Try Wayback /api/v1/products/ — oldest capture first (pre-HelloFresh + acquisition data is more complete). + 2. If instructions are empty, try the recipe HTML page via Wayback and parse + inline JSON state or structured markup. + 3. Merge with metadata already in the manifest (title, tags, cook_time, etc.) + 4. Emit one row per recipe to recipes_purplecarrot.parquet in food.com columnar + format so build_recipe_index.py can import it unchanged. + +Output columns (food.com schema + PC extras ignored by the indexer): + RecipeId, Name, Subtitle, RecipeIngredientParts, RecipeInstructions, + RecipeCategory, Keywords, Calories, FatContent, ProteinContent, + SodiumContent, SugarContent, CarbohydrateContent, FiberContent, + RecipeServings, Description, ImageURL, CookTime, Slug, Source + +Usage: + conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes + conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes \\ + --slugs /Library/Assets/kiwi/pipeline/pc_slugs.jsonl \\ + --out /Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet \\ + --resume +""" +from __future__ import annotations + +import argparse +import json +import logging +import re +import time +from pathlib import Path +from typing import Any + +import requests + +logger = logging.getLogger(__name__) + +CDX_BASE = "http://web.archive.org/cdx/search/cdx" +WB_BASE = "https://web.archive.org/web" +PC_HOST = "www.purplecarrot.com" + +REPLAY_DELAY = 1.2 +CDX_DELAY = 0.5 + +DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl") +DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet") + +# Inline JSON state embedded by the SSR renderer — used as fallback HTML parser +_NEXT_DATA_RE = re.compile(r'', re.DOTALL) +_REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n', re.DOTALL) + + +# ── Wayback helpers ─────────────────────────────────────────────────────────── + +def _cdx_timestamps(slug: str) -> list[str]: + """Return all captured timestamps for a product slug, oldest first.""" + url = f"{PC_HOST}/api/v1/products/{slug}" + try: + resp = requests.get( + CDX_BASE, + params={ + "url": url, + "output": "json", + "fl": "timestamp,statuscode", + "filter": "statuscode:200", + "limit": "20", + }, + timeout=20, + ) + resp.raise_for_status() + rows = resp.json() + if len(rows) < 2: + return [] + return [row[0] for row in rows[1:]] # timestamps only, oldest first + except Exception as exc: + logger.debug("CDX timestamps failed for %s: %s", slug, exc) + return [] + + +def _wayback_json(url: str, timestamp: str) -> Any | None: + replay = f"{WB_BASE}/{timestamp}/{url}" + for attempt in range(3): + try: + resp = requests.get(replay, timeout=30) + if resp.status_code == 200: + return resp.json() + if resp.status_code in (404, 410): + return None + except Exception as exc: + logger.debug("Wayback JSON attempt %d failed (%s): %s", attempt + 1, url, exc) + time.sleep(2 ** attempt) + return None + + +def _wayback_html(url: str, timestamp: str) -> str | None: + replay = f"{WB_BASE}/{timestamp}/{url}" + for attempt in range(3): + try: + resp = requests.get(replay, timeout=30) + if resp.status_code == 200: + return resp.text + if resp.status_code in (404, 410): + return None + except Exception as exc: + logger.debug("Wayback HTML attempt %d failed (%s): %s", attempt + 1, url, exc) + time.sleep(2 ** attempt) + return None + + +# ── Recipe extraction from API JSON ────────────────────────────────────────── + +def _extract_from_api(data: dict) -> dict | None: + """Parse a /api/v1/products/ response into our recipe dict. + + Returns None if the response has no usable content (empty title, etc.). + Returns a partial dict if only some fields are populated — caller merges + with manifest metadata. + """ + if not data or not isinstance(data, dict): + return None + + title = data.get("title", "").strip() + subtitle = data.get("subtitle", "").strip() + slug = data.get("slug", "") + + skus = data.get("skus") or [] + sku = skus[0] if skus else {} + + # Instructions: list of {step_number, title, description} + raw_instructions = sku.get("instructions") or [] + steps: list[str] = [] + for step in sorted(raw_instructions, key=lambda s: s.get("step_number", 0)): + parts = [] + if step.get("title"): + parts.append(step["title"]) + if step.get("description"): + parts.append(step["description"]) + if parts: + steps.append(". ".join(parts)) + + # Ingredients: may be in ingredients_quantity or ingredients + raw_ingr = sku.get("ingredients_quantity") or sku.get("ingredients") or [] + ingredients: list[str] = [] + for item in raw_ingr: + if isinstance(item, dict): + qty = item.get("quantity") or item.get("qty") or "" + unit = item.get("unit") or "" + name = item.get("name") or item.get("ingredient", {}).get("name", "") if isinstance(item.get("ingredient"), dict) else item.get("ingredient_name", "") + raw = item.get("raw") or item.get("display_name") or "" + line = raw or " ".join(filter(None, [str(qty), str(unit), str(name)])).strip() + if line: + ingredients.append(line) + elif isinstance(item, str) and item.strip(): + ingredients.append(item.strip()) + + nutrition = sku.get("nutrition_label") or {} + calories = _num(nutrition.get("calories") or sku.get("calories")) + fat = _num(nutrition.get("total_fat") or sku.get("fat")) + protein = _num(nutrition.get("protein") or sku.get("protein")) + sodium = _num(nutrition.get("sodium") or sku.get("sodium")) + sugar = _num(nutrition.get("sugar") or nutrition.get("total_sugars")) + carbs = _num(nutrition.get("total_carbohydrate") or sku.get("carbs")) + fiber = _num(nutrition.get("dietary_fiber") or sku.get("fiber")) + + tags = sku.get("tags") or data.get("tags") or [] + category = sku.get("meal_type") or sku.get("product_type") or "" + servings = _num(sku.get("servings")) + + cook_time = sku.get("prep_and_cook_time") or "" + description = sku.get("description") or "" + + images = sku.get("hero_images") or sku.get("image_versions") or [] + image_url = "" + if images and isinstance(images[0], dict): + image_url = images[0].get("image_url") or images[0].get("url") or "" + if not image_url and data.get("square_image"): + sq = data["square_image"] + image_url = sq.get("url") if isinstance(sq, dict) else "" + + return { + "slug": slug, + "title": title, + "subtitle": subtitle, + "steps": steps, + "ingredients": ingredients, + "category": category, + "tags": tags, + "calories": calories, + "fat": fat, + "protein": protein, + "sodium": sodium, + "sugar": sugar, + "carbs": carbs, + "fiber": fiber, + "servings": servings, + "cook_time": cook_time, + "description": description, + "image_url": image_url, + "has_full_recipe": bool(steps and ingredients), + } + + +def _num(val: Any) -> float | None: + if val is None: + return None + try: + v = float(str(val).replace("g", "").replace("mg", "").split()[0]) + return v if v > 0 else None + except Exception: + return None + + +# ── Fallback: HTML inline state parsing ────────────────────────────────────── + +def _extract_from_html(html: str, slug: str) -> dict | None: + """Try to pull recipe data from inline JS state in older SSR pages.""" + # Attempt 1: Next.js __NEXT_DATA__ + m = _NEXT_DATA_RE.search(html) + if m: + try: + state = json.loads(m.group(1)) + # Walk the Next.js page props tree looking for recipe data + props = state.get("props", {}).get("pageProps", {}) + recipe = props.get("recipe") or props.get("product") + if recipe and isinstance(recipe, dict) and recipe.get("title"): + return _extract_from_api(recipe) + except Exception: + pass + + # Attempt 2: Redux __INITIAL_STATE__ + m = _REDUX_STATE_RE.search(html) + if m: + try: + state = json.loads(m.group(1)) + # Try common Redux state shapes + for key in ("recipe", "product", "currentRecipe", "currentProduct"): + recipe = state.get(key) + if recipe and isinstance(recipe, dict) and recipe.get("title"): + return _extract_from_api(recipe) + except Exception: + pass + + # Attempt 3: JSON-LD structured data + ld_matches = re.findall( + r']+type=["\']application/ld\+json["\'][^>]*>(.*?)', + html, re.DOTALL + ) + for raw in ld_matches: + try: + ld = json.loads(raw) + if isinstance(ld, list): + ld = next((x for x in ld if x.get("@type") == "Recipe"), None) + if not ld or ld.get("@type") != "Recipe": + continue + steps = [] + for inst in (ld.get("recipeInstructions") or []): + if isinstance(inst, dict): + steps.append(inst.get("text", "")) + elif isinstance(inst, str): + steps.append(inst) + ingredients = ld.get("recipeIngredient") or [] + return { + "slug": slug, + "title": ld.get("name", ""), + "subtitle": "", + "steps": [s for s in steps if s], + "ingredients": [i for i in ingredients if i], + "category": ld.get("recipeCategory", ""), + "tags": ld.get("keywords", "").split(",") if isinstance(ld.get("keywords"), str) else [], + "calories": _num((ld.get("nutrition") or {}).get("calories")), + "fat": None, "protein": None, "sodium": None, + "sugar": None, "carbs": None, "fiber": None, + "servings": _num(ld.get("recipeYield")), + "cook_time": str(ld.get("totalTime") or ld.get("cookTime") or ""), + "description": ld.get("description", ""), + "image_url": (ld["image"][0] if isinstance(ld.get("image"), list) else ld.get("image", "")) or "", + "has_full_recipe": True, + } + except Exception: + pass + + return None + + +# ── Per-slug fetch ───────────────────────────────────────────────────────────── + +def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None: + """Fetch the fullest available recipe data for a slug from Wayback. + + Returns a merged dict of manifest metadata + API/HTML-extracted content. + """ + api_url = f"https://{PC_HOST}/api/v1/products/{slug}" + html_url = f"https://{PC_HOST}/recipe/{slug}" + + recipe: dict | None = None + + # Try product API — oldest captures are most likely to have full data + timestamps = _cdx_timestamps(slug) + time.sleep(CDX_DELAY) + + if not timestamps and manifest_meta.get("wayback_ts"): + timestamps = [manifest_meta["wayback_ts"]] + + for ts in timestamps: + data = _wayback_json(api_url, ts) + time.sleep(REPLAY_DELAY) + if not data: + continue + candidate = _extract_from_api(data) + if not candidate: + continue + recipe = candidate + if recipe.get("has_full_recipe"): + logger.debug("[%s] Full recipe from API (ts=%s)", slug, ts) + break + logger.debug("[%s] Partial API data (ts=%s) — trying HTML fallback", slug, ts) + + # HTML fallback when API has no steps/ingredients + if not recipe or not recipe.get("has_full_recipe"): + html_cdx_url = f"{PC_HOST}/recipe/{slug}" + try: + html_resp = requests.get( + CDX_BASE, + params={ + "url": html_cdx_url, + "output": "json", + "fl": "timestamp,statuscode", + "filter": "statuscode:200", + "limit": "5", + }, + timeout=20, + ) + html_ts_rows = html_resp.json() if html_resp.ok else [] + html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else [] + except Exception: + html_timestamps = [] + time.sleep(CDX_DELAY) + + for ts in html_timestamps: + html = _wayback_html(html_url, ts) + time.sleep(REPLAY_DELAY) + if not html: + continue + html_recipe = _extract_from_html(html, slug) + if html_recipe and html_recipe.get("has_full_recipe"): + logger.debug("[%s] Full recipe from HTML (ts=%s)", slug, ts) + recipe = html_recipe + break + + # Build merged record: manifest metadata fills any gaps from API/HTML + merged: dict = { + "slug": slug, + "title": manifest_meta.get("title", ""), + "subtitle": manifest_meta.get("subtitle", ""), + "steps": [], + "ingredients": [], + "category": "", + "tags": manifest_meta.get("tags") or [], + "calories": None, + "fat": None, + "protein": None, + "sodium": None, + "sugar": None, + "carbs": None, + "fiber": None, + "servings": manifest_meta.get("serving_size"), + "cook_time": manifest_meta.get("cook_time", ""), + "description": manifest_meta.get("description", ""), + "image_url": manifest_meta.get("image_url", ""), + "source": "purple_carrot", + "wayback_ts": manifest_meta.get("wayback_ts", ""), + "has_full_recipe": False, + } + + if recipe: + for key in recipe: + # Prefer API/HTML data; keep manifest value only when API field is empty + val = recipe[key] + if val or key not in merged or not merged[key]: + merged[key] = val + + if not merged["title"]: + logger.warning("[%s] No title — skipping", slug) + return None + + return merged + + +# ── Output formatting ───────────────────────────────────────────────────────── + +def _to_dataframe_row(r: dict) -> dict: + """Convert merged recipe dict to food.com-compatible parquet row.""" + # Build plain-text input for allrecipes-style corpus compatibility + lines = [r["title"]] + if r.get("subtitle"): + lines.append(r["subtitle"]) + if r.get("description"): + lines.append("") + lines.append(r["description"]) + if r.get("ingredients"): + lines += ["", "Ingredients:"] + [f"- {i}" for i in r["ingredients"]] + if r.get("steps"): + lines += ["", "Directions:"] + [f"- {s}" for s in r["steps"]] + plain_text = "\n".join(lines) + + source_url = f"https://www.purplecarrot.com/recipe/{r['slug']}" + + return { + # food.com schema columns (used by build_recipe_index.py) + "RecipeId": f"pc_{r['slug']}", + "Name": r["title"], + "RecipeIngredientParts": r.get("ingredients") or [], + "RecipeInstructions": r.get("steps") or [], + "RecipeCategory": r.get("category", ""), + "Keywords": r.get("tags") or [], + "Calories": r.get("calories"), + "FatContent": r.get("fat"), + "ProteinContent": r.get("protein"), + "SodiumContent": r.get("sodium"), + "SugarContent": r.get("sugar"), + "CarbohydrateContent": r.get("carbs"), + "FiberContent": r.get("fiber"), + "RecipeServings": r.get("servings"), + # PC-specific extras (ignored by indexer, used by training pipeline) + "Subtitle": r.get("subtitle", ""), + "Description": r.get("description", ""), + "ImageURL": r.get("image_url", ""), + "CookTime": r.get("cook_time", ""), + "Slug": r["slug"], + "Source": "purple_carrot", + "SourceURL": source_url, # canonical attribution link shown in recipe UI + "HasFullRecipe": r.get("has_full_recipe", False), + "WaybackTs": r.get("wayback_ts", ""), + # Also emit plain-text input for allrecipes-compatible corpus search + "input": plain_text, + } + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def scrape(slugs_file: Path, out_file: Path, resume: bool = True) -> None: + import pandas as pd + + # Load manifest + if not slugs_file.exists(): + logger.error("Slugs manifest not found: %s", slugs_file) + return + + manifest: dict[str, dict] = {} + with open(slugs_file) as f: + for line in f: + line = line.strip() + if line: + rec = json.loads(line) + slug = rec["slug"] + # Keep the richest metadata if slug appears from multiple sources + if slug not in manifest or rec.get("source") == "menu": + manifest[slug] = rec + + logger.info("Manifest: %d unique slugs", len(manifest)) + + # Load already-scraped slugs for resume + done_slugs: set[str] = set() + existing_rows: list[dict] = [] + if resume and out_file.exists(): + try: + existing_df = pd.read_parquet(out_file) + done_slugs = set(existing_df["Slug"].tolist()) + existing_rows = existing_df.to_dict("records") + logger.info("Resume: %d already scraped", len(done_slugs)) + except Exception as exc: + logger.warning("Could not load existing parquet for resume: %s", exc) + + todo = [s for s in manifest if s not in done_slugs] + logger.info("%d slugs to fetch", len(todo)) + + rows = list(existing_rows) + for i, slug in enumerate(todo, 1): + logger.info("[%d/%d] %s", i, len(todo), slug) + recipe = fetch_recipe(slug, manifest[slug]) + if recipe: + rows.append(_to_dataframe_row(recipe)) + status = "full" if recipe.get("has_full_recipe") else "partial" + logger.info(" -> %s (%s)", recipe.get("title", "?"), status) + else: + logger.warning(" -> skipped (no title)") + + # Write checkpoint every 50 recipes + if i % 50 == 0: + _write_parquet(rows, out_file) + logger.info("Checkpoint: %d recipes written", len(rows)) + + _write_parquet(rows, out_file) + full = sum(1 for r in rows if r.get("HasFullRecipe")) + logger.info( + "Done. %d recipes written to %s (%d full, %d partial).", + len(rows), out_file, full, len(rows) - full, + ) + + +def _write_parquet(rows: list[dict], out_file: Path) -> None: + import pandas as pd + out_file.parent.mkdir(parents=True, exist_ok=True) + pd.DataFrame(rows).to_parquet(out_file, index=False) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Scrape Purple Carrot recipes from Wayback") + parser.add_argument("--slugs", type=Path, default=DEFAULT_SLUGS) + parser.add_argument("--out", type=Path, default=DEFAULT_OUT) + parser.add_argument( + "--no-resume", dest="resume", action="store_false", + help="Start fresh (ignore existing parquet)", + ) + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + scrape(args.slugs, args.out, resume=args.resume) + + +if __name__ == "__main__": + main()