Compare commits

..

2 commits

Author SHA1 Message Date
a9ab996bcc feat(pipeline): purple carrot weekly menu scraper with CF bypass
Some checks are pending
CI / Backend (Python) (push) Waiting to run
CI / Frontend (Vue) (push) Waiting to run
Mirror / mirror (push) Waiting to run
Add three new scripts for Purple Carrot recipe pipeline:

- discover_current_menu.py: fetches this week's active menu slugs from
  /plant-based-recipes using requests (server-rendered HTML, no JS needed).
  Accumulates slugs across weekly runs for building a recipe corpus over time.

- discover_slugs_categories.py: crawls recipe-category listing pages with
  ?page=N pagination to discover historical slug inventory. Note: category
  archive slugs (past menu items) 404 when scraped live; only use for
  identifying currently-featured recipes per category.

- scrape_live.py: updated with --slugs-from flag (load slug inventory from
  any parquet, not just the default Wayback one) and fresh-context-per-slug
  pattern to bypass Cloudflare session-level bot detection (which fires on
  the 2nd+ request in a shared browser context).

Discovery: the live site only renders full ingredient/instruction content for
recipes currently on the active weekly menu. 23/23 current menu recipes
scraped successfully (100% hit rate vs ~1% for archived slugs).
2026-05-21 16:16:32 -07:00
56f942b3fd feat(pipeline): Purple Carrot scraper hardening + shared pipeline logging
scrape_recipes.py:
- Switch CDX to HTTPS (avoids HTTP 503 rate-limit bucket)
- Restrict product API CDX to 2019–2021 window (pre-HelloFresh instruction stripping)
- Replace inline CDX requests with _cdx_get() helper: retries on 429/503 with
  exponential backoff (15s, 30s, 60s, 120s)
- Increase HTML fallback CDX limit from 5 to 10 timestamps
- Bump CDX_DELAY 0.5s → 3.0s and REPLAY_DELAY 1.2s → 2.0s (polite scraping)
- Fix KeyError: 0 on hero_images dict (normalise dict to list before indexing)

discover_wayback.py:
- Switch CDX to HTTPS

scripts/pipeline/log_utils.py (new):
- attach_pipeline_log(script_name): adds a JSON FileHandler to the root logger
  writing to /Library/Assets/logs/pipeline/<script>_<ts>.jsonl for Avocet
  Turnstone training data ingestion (kiwi#141 / avocet#67)
2026-05-17 13:35:35 -07:00
6 changed files with 710 additions and 42 deletions

View file

@ -0,0 +1,68 @@
"""
Pipeline logging utility.
Adds a structured JSON FileHandler to the root logger so every pipeline
script automatically writes machine-readable logs to the shared datastore
at /Library/Assets/logs/pipeline/. Avocet ingests these for Turnstone
logreading training (kiwi#141 / avocet#67).
Usage (add near the top of main() after logging.basicConfig):
from scripts.pipeline.log_utils import attach_pipeline_log
attach_pipeline_log("scrape_recipes")
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
PIPELINE_LOG_DIR = Path(
os.environ.get("PIPELINE_LOG_DIR", "/Library/Assets/logs/pipeline")
)
class _JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload: dict = {
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
if record.exc_info:
payload["exc"] = self.formatException(record.exc_info)
# Any extra kwargs passed via logger.info("...", extra={...})
standard = {
"name", "msg", "args", "levelname", "levelno", "pathname",
"filename", "module", "exc_info", "exc_text", "stack_info",
"lineno", "funcName", "created", "msecs", "relativeCreated",
"thread", "threadName", "processName", "process", "message",
"taskName",
}
extra = {k: v for k, v in record.__dict__.items() if k not in standard}
if extra:
payload["extra"] = extra
return json.dumps(payload)
def attach_pipeline_log(script_name: str) -> Path:
"""Attach a JSON file handler to the root logger for pipeline logging.
Returns the path of the log file created.
"""
PIPELINE_LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%S")
log_path = PIPELINE_LOG_DIR / f"{script_name}_{ts}.jsonl"
handler = logging.FileHandler(log_path, encoding="utf-8")
handler.setLevel(logging.DEBUG)
handler.setFormatter(_JsonFormatter())
logging.getLogger().addHandler(handler)
logging.getLogger(__name__).info(
"Pipeline log: %s", log_path, extra={"script": script_name}
)
return log_path

View file

@ -0,0 +1,120 @@
"""Discover Purple Carrot's current weekly menu recipe slugs.
The main /plant-based-recipes listing page always renders the current week's
menu as server-side HTML. This script pulls those slugs and writes them to a
parquet that can be passed directly to scrape_live.py via --slugs-from.
Run weekly (e.g. via cron) to accumulate new recipes as the menu rotates.
Usage:
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_current_menu.py \
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet]
Then scrape:
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
--slugs-from /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet \
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \
--resume
"""
from __future__ import annotations
import re
import sys
from datetime import date
from pathlib import Path
import pandas as pd
import requests
from bs4 import BeautifulSoup
# ── Config ─────────────────────────────────────────────────────────────────────
LISTING_URL = "https://www.purplecarrot.com/plant-based-recipes"
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet")
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
RECIPE_HREF_RE = re.compile(r"/recipe/([^?#]+)")
# ── Main ───────────────────────────────────────────────────────────────────────
def discover_current_slugs() -> list[str]:
"""Fetch the listing page and return unique recipe slugs from the current menu."""
resp = requests.get(LISTING_URL, headers=HEADERS, timeout=15)
if resp.status_code != 200:
print(f"ERROR: listing page returned HTTP {resp.status_code}", file=sys.stderr)
return []
soup = BeautifulSoup(resp.text, "html.parser")
slugs: list[str] = []
seen: set[str] = set()
for a in soup.find_all("a", href=RECIPE_HREF_RE):
m = RECIPE_HREF_RE.search(a["href"])
if m:
slug = m.group(1)
if slug not in seen:
seen.add(slug)
slugs.append(slug)
return slugs
def main() -> None:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
args = parser.parse_args()
print(f"Fetching current menu from {LISTING_URL}")
slugs = discover_current_slugs()
if not slugs:
print("No slugs found — the listing page may have changed structure or blocked the request.")
sys.exit(1)
today = date.today().isoformat()
records = [
{
"Slug": slug,
"SourceURL": BASE_URL.format(slug=slug),
"Source": "purplecarrot_menu",
"DiscoveredDate": today,
}
for slug in slugs
]
# Merge with any existing menu parquet (accumulate weeks)
df_new = pd.DataFrame(records)
args.out.parent.mkdir(parents=True, exist_ok=True)
if args.out.exists():
df_prev = pd.read_parquet(args.out)
combined = pd.concat([df_prev, df_new], ignore_index=True)
combined = combined.drop_duplicates(subset=["Slug"], keep="first")
df_new = combined
df_new.to_parquet(args.out, index=False)
print(f"Found {len(slugs)} current-menu slugs this week:")
for s in slugs:
print(f" {s}")
print(f"\nSaved {len(df_new)} total slugs (accumulated) to {args.out}")
print(f"\nTo scrape full recipes:")
print(f" conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \\")
print(f" --slugs-from {args.out} \\")
print(f" --out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \\")
print(f" --resume")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,218 @@
"""Discover Purple Carrot recipe slugs by crawling all recipe-category listing pages.
The site serves full server-rendered HTML for category pages, paginated via
?page=N. Each page loads 18 recipe cards. This script crawls every category
across all pages and writes a deduplicated slug inventory.
Usage:
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_slugs_categories.py \
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet] \
[--delay 2.0] \
[--max-pages 50] # safety cap per category (comfort-foods has ~18)
"""
from __future__ import annotations
import argparse
import re
import time
from pathlib import Path
from typing import Any
import pandas as pd
import requests
from bs4 import BeautifulSoup
# ── Config ─────────────────────────────────────────────────────────────────────
BASE = "https://www.purplecarrot.com"
# All known category slugs (from /plant-based-recipes nav)
CATEGORIES: list[str] = [
"comfort-foods",
"family-friendly",
"healthy-desserts",
"holiday-recipes",
"quick-and-easy",
"party-foods",
"seasonal-menu",
"spring-recipes",
"summer-recipes",
"fall-recipes",
"winter-recipes",
"african",
"american",
"asian",
"comfort",
"french",
"indian",
"italian",
"mediterranean",
"mexican",
"middle-eastern",
"soups",
"salads",
"bowls",
"pasta",
"sandwiches-wraps",
"tacos",
"breakfast",
"snacks-sides",
]
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet")
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
RECIPE_LINK_SELECTOR = "a.c-recipe__title"
SLUG_RE = re.compile(r"/recipe/([^?#]+)")
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
# ── Helpers ────────────────────────────────────────────────────────────────────
def _fetch_html(url: str, session: requests.Session) -> str | None:
"""Fetch URL and return HTML string, or None on failure."""
try:
resp = session.get(url, headers=HEADERS, timeout=15)
if resp.status_code == 200:
return resp.text
if resp.status_code == 404:
return None # expected end of pagination
print(f" HTTP {resp.status_code}{url}")
return None
except Exception as exc:
print(f" ERROR fetching {url}: {exc}")
return None
def _extract_slugs(html: str) -> list[str]:
"""Pull recipe slugs from one listing-page HTML response."""
soup = BeautifulSoup(html, "html.parser")
slugs: list[str] = []
for a in soup.select(RECIPE_LINK_SELECTOR):
href = a.get("href", "")
m = SLUG_RE.search(href)
if m:
slugs.append(m.group(1))
return slugs
def _get_category_total(html: str) -> int | None:
"""Try to parse the recipe count shown on the category page (e.g. '319 Recipes')."""
m = re.search(r"(\d+)\s+Recipes?\b", html)
return int(m.group(1)) if m else None
def _discover_category(
category: str,
session: requests.Session,
delay: float,
max_pages: int,
) -> tuple[list[str], int]:
"""Crawl all pages of a category, return (slugs, pages_fetched)."""
slugs: list[str] = []
for page_num in range(1, max_pages + 1):
if page_num == 1:
url = f"{BASE}/recipe-categories/{category}"
else:
url = f"{BASE}/recipe-categories/{category}?page={page_num}"
html = _fetch_html(url, session)
if html is None:
break # 404 or error = past the end
page_slugs = _extract_slugs(html)
if not page_slugs:
# Show total if we got a page but no links (category slug may be wrong)
if page_num == 1:
total = _get_category_total(html)
if total is not None:
print(f" page 1 loaded (total={total}) but 0 recipe links — selector may need updating")
break
slugs.extend(page_slugs)
# Print progress
total_hint = _get_category_total(html) if page_num == 1 else None
total_str = f" / {total_hint}" if total_hint else ""
print(f" page {page_num}: +{len(page_slugs)} slugs ({len(slugs)}{total_str} cumulative)")
if len(page_slugs) < 18:
# Short page = last page
break
time.sleep(delay)
return slugs, (len(slugs) + 17) // 18 # approximate pages
# ── Main ───────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
parser.add_argument("--delay", type=float, default=2.0,
help="Seconds between page requests")
parser.add_argument("--max-pages", type=int, default=50,
help="Safety cap on pages per category")
parser.add_argument("--categories", nargs="*",
help="Crawl only these category slugs (default: all)")
args = parser.parse_args()
categories = args.categories or CATEGORIES
# Seed with any slugs from the Wayback parquet
known_slugs: set[str] = set()
if EXISTING_PARQUET.exists():
df_wb = pd.read_parquet(EXISTING_PARQUET)
known_slugs = set(df_wb["Slug"].dropna().tolist())
print(f"Seeded with {len(known_slugs)} slugs from Wayback parquet")
all_records: list[dict[str, Any]] = []
session = requests.Session()
for category in categories:
print(f"\n[{category}]")
cat_slugs, pages = _discover_category(category, session, args.delay, args.max_pages)
for slug in cat_slugs:
all_records.append({"Slug": slug, "Category": category, "Source": "purplecarrot_category"})
print(f"{len(cat_slugs)} slugs across ~{pages} pages")
time.sleep(args.delay)
if not all_records:
print("\nNo records found — check that categories are correct and the site is accessible")
return
# Deduplicate keeping first category encountered
df_new = pd.DataFrame(all_records)
df_new = df_new.drop_duplicates(subset=["Slug"], keep="first")
# Also include Wayback slugs not already in the new set
if known_slugs:
wb_only = known_slugs - set(df_new["Slug"].tolist())
if wb_only:
df_wb_extra = pd.DataFrame([
{"Slug": s, "Category": "wayback", "Source": "purplecarrot_wayback"}
for s in wb_only
])
df_new = pd.concat([df_new, df_wb_extra], ignore_index=True)
args.out.parent.mkdir(parents=True, exist_ok=True)
df_new.to_parquet(args.out, index=False)
new_count = len(df_new)
cat_count = len(df_new[df_new["Source"] == "purplecarrot_category"])
print(f"\nDone — {new_count} total slugs saved to {args.out}")
print(f" {cat_count} from category pages, {new_count - cat_count} from Wayback only")
if __name__ == "__main__":
main()

View file

@ -31,7 +31,7 @@ import requests
logger = logging.getLogger(__name__)
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
WB_BASE = "https://web.archive.org/web"
PC_HOST = "www.purplecarrot.com"
@ -291,6 +291,9 @@ def main() -> None:
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
from scripts.pipeline.log_utils import attach_pipeline_log
attach_pipeline_log("discover_wayback")
discover(args.out)

View file

@ -0,0 +1,250 @@
"""Playwright scraper for live purplecarrot.com recipe pages.
Uses the slug inventory already in recipes_purplecarrot.parquet and fills in
the missing ingredients/instructions by hitting the live site directly.
Usage:
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet] \
[--delay 2.5] \
[--limit 20]
"""
from __future__ import annotations
import argparse
import json
import re
import time
from pathlib import Path
from typing import Any
import pandas as pd
from playwright.sync_api import sync_playwright, Page, TimeoutError as PWTimeout
# ── Config ─────────────────────────────────────────────────────────────────────
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
RENDER_WAIT_MS = 2500 # JS render settle time
NAV_TIMEOUT_MS = 20_000
# ── Page parser ────────────────────────────────────────────────────────────────
def _text(page: Page, selector: str) -> str:
el = page.query_selector(selector)
return el.inner_text().strip() if el else ""
def _texts(page: Page, selector: str) -> list[str]:
return [el.inner_text().strip() for el in page.query_selector_all(selector)]
def _parse_recipe(page: Page, slug: str, source_url: str) -> dict[str, Any] | None:
"""Extract structured recipe data from the rendered page."""
body = page.inner_text("body")
# Abort if we've been bounced to a generic listing / 404
if "Page Not Found" in body or slug not in page.url:
return None
# ── Title ──────────────────────────────────────────────────────────────────
# The <h1> on product pages tends to be the recipe name
title = (_text(page, "h1") or _text(page, "[class*='recipe-title']")).strip()
if not title:
# Fallback: first heading-like text before "Ingredients"
idx = body.find("Ingredients\n")
title = body[:idx].strip().splitlines()[-1] if idx > 0 else ""
# ── Ingredients / Instructions via body text ───────────────────────────────
ing_start = body.find("\nIngredients\n")
inst_start = body.find("\nInstructions\n")
footer_start = body.find("\nShop\n") # footer sentinel
if ing_start == -1:
return None # page didn't render recipe content
raw_ingredients: list[str] = []
raw_instructions: list[str] = []
if ing_start != -1 and inst_start != -1:
ing_block = body[ing_start + len("\nIngredients\n"):inst_start].strip()
raw_ingredients = [l.strip() for l in ing_block.splitlines() if l.strip()]
if inst_start != -1:
end = footer_start if footer_start > inst_start else len(body)
inst_block = body[inst_start + len("\nInstructions\n"):end].strip()
# Steps start with a digit
steps: list[str] = []
current: list[str] = []
for line in inst_block.splitlines():
line = line.strip()
if not line:
continue
if re.match(r"^\d+$", line):
if current:
steps.append(" ".join(current))
current = []
elif line.startswith("CULINARY NOTES"):
break
else:
current.append(line)
if current:
steps.append(" ".join(current))
raw_instructions = steps
# ── Nutrition ──────────────────────────────────────────────────────────────
def _extract_num(pattern: str) -> float | None:
m = re.search(pattern, body)
try:
return float(m.group(1)) if m else None
except ValueError:
return None
cal = _extract_num(r"(\d+)\s*CAL")
fat = _extract_num(r"(\d+(?:\.\d+)?)g\s*FAT")
carbs = _extract_num(r"(\d+(?:\.\d+)?)g\s*CARBS")
prot = _extract_num(r"(\d+(?:\.\d+)?)g\s*PROTEIN")
fiber = _extract_num(r"(\d+(?:\.\d+)?)g\s*FIBER")
# ── Allergens / tags ───────────────────────────────────────────────────────
allergen_m = re.search(r"Allergens?:\s*([^\n]+)", body)
allergens = allergen_m.group(1).strip() if allergen_m else ""
# Feature tags like HIGH-PROTEIN, QUICK, etc. appear before Ingredients
pre_ing = body[:ing_start]
tags = re.findall(r"\b(HIGH-PROTEIN|QUICK|SPICY|LOW[\-\s]CALORIE|VEGAN|FAMILY\s+FRIENDLY)\b", pre_ing)
return {
"Slug": slug,
"Name": title,
"SourceURL": source_url,
"Source": "purplecarrot_live",
"RecipeIngredientParts": raw_ingredients,
"RecipeInstructions": raw_instructions,
"Calories": cal,
"FatContent": fat,
"CarbohydrateContent": carbs,
"ProteinContent": prot,
"FiberContent": fiber,
"Allergens": allergens,
"Keywords": tags,
"HasFullRecipe": bool(raw_ingredients and raw_instructions),
}
# ── Main ───────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
parser.add_argument("--delay", type=float, default=2.5,
help="Seconds between requests (be polite)")
parser.add_argument("--limit", type=int, default=0,
help="Stop after N slugs (0 = all)")
parser.add_argument("--resume", action="store_true",
help="Skip slugs already present in --out")
parser.add_argument("--slugs-from", type=Path, default=None,
help="Read slug inventory from this parquet instead of the default Wayback one")
args = parser.parse_args()
# Load slug inventory — either from a custom parquet or the default Wayback run
slugs_parquet = args.slugs_from if args.slugs_from else EXISTING_PARQUET
df_existing = pd.read_parquet(slugs_parquet)
slugs = df_existing["Slug"].dropna().unique().tolist()
# source_urls may not be present in custom parcets — fall back to constructing from slug
if "SourceURL" in df_existing.columns:
source_urls = dict(zip(df_existing["Slug"], df_existing["SourceURL"]))
else:
source_urls = {s: BASE_URL.format(slug=s) for s in slugs}
# Resume support
done_slugs: set[str] = set()
if args.resume and args.out.exists():
df_done = pd.read_parquet(args.out)
done_slugs = set(df_done["Slug"].dropna().tolist())
print(f"Resuming — {len(done_slugs)} slugs already scraped")
if args.limit:
slugs = slugs[: args.limit]
results: list[dict[str, Any]] = []
skipped = 0
failed = 0
_UA = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
for i, slug in enumerate(slugs):
if slug in done_slugs:
skipped += 1
continue
url = BASE_URL.format(slug=slug)
print(f"[{i+1}/{len(slugs)}] {slug}", end="", flush=True)
# Use a fresh browser context per slug to avoid Cloudflare session-level
# bot detection, which fires on the 2nd+ request in the same context.
context = browser.new_context(
user_agent=_UA,
viewport={"width": 1280, "height": 900},
)
page = context.new_page()
try:
page.goto(url, timeout=NAV_TIMEOUT_MS, wait_until="domcontentloaded")
page.wait_for_timeout(RENDER_WAIT_MS)
recipe = _parse_recipe(page, slug, source_urls.get(slug, url))
except PWTimeout:
print("TIMEOUT")
failed += 1
except Exception as exc:
print(f"ERROR: {exc}")
failed += 1
else:
if recipe is None:
print("no content (404 or redirect)")
failed += 1
elif recipe["HasFullRecipe"]:
n = len(recipe["RecipeIngredientParts"])
s = len(recipe["RecipeInstructions"])
print(f"OK ({n} ingredients, {s} steps)")
results.append(recipe)
else:
print(f"partial (ings={len(recipe['RecipeIngredientParts'])}, steps={len(recipe['RecipeInstructions'])})")
results.append(recipe)
finally:
context.close()
time.sleep(args.delay)
browser.close()
print(f"\nDone — {len(results)} scraped, {skipped} skipped, {failed} failed")
if results:
df_out = pd.DataFrame(results)
# Merge with existing metadata (nutrition stubs, wayback fields) for slugs
# that didn't previously have full data
args.out.parent.mkdir(parents=True, exist_ok=True)
if args.resume and args.out.exists():
df_prev = pd.read_parquet(args.out)
df_out = pd.concat([df_prev, df_out], ignore_index=True)
df_out = df_out.drop_duplicates(subset=["Slug"], keep="last")
df_out.to_parquet(args.out, index=False)
full_count = df_out["HasFullRecipe"].sum() if "HasFullRecipe" in df_out.columns else "?"
print(f"Saved {len(df_out)} rows to {args.out} ({full_count} with full recipes)")
else:
print("No results — output not written")
if __name__ == "__main__":
main()

View file

@ -37,12 +37,12 @@ import requests
logger = logging.getLogger(__name__)
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
WB_BASE = "https://web.archive.org/web"
PC_HOST = "www.purplecarrot.com"
REPLAY_DELAY = 1.2
CDX_DELAY = 0.5
REPLAY_DELAY = 2.0
CDX_DELAY = 3.0 # archive.org CDX rate-limits aggressively; be polite
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
@ -54,29 +54,41 @@ _REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n',
# ── Wayback helpers ───────────────────────────────────────────────────────────
def _cdx_get(params: dict) -> list:
"""CDX request with retry on 429/503 (archive.org rate-limits aggressively)."""
for attempt in range(4):
try:
resp = requests.get(CDX_BASE, params=params, timeout=25)
if resp.status_code in (429, 503):
wait = 15 * (2 ** attempt)
logger.debug("CDX %s — backing off %ds", resp.status_code, wait)
time.sleep(wait)
continue
resp.raise_for_status()
rows = resp.json()
return rows if rows else []
except Exception as exc:
logger.debug("CDX attempt %d failed: %s", attempt + 1, exc)
time.sleep(5 * (attempt + 1))
return []
def _cdx_timestamps(slug: str) -> list[str]:
"""Return all captured timestamps for a product slug, oldest first."""
url = f"{PC_HOST}/api/v1/products/{slug}"
try:
resp = requests.get(
CDX_BASE,
params={
"url": url,
"output": "json",
"fl": "timestamp,statuscode",
"filter": "statuscode:200",
"limit": "20",
},
timeout=20,
)
resp.raise_for_status()
rows = resp.json()
if len(rows) < 2:
return []
return [row[0] for row in rows[1:]] # timestamps only, oldest first
except Exception as exc:
logger.debug("CDX timestamps failed for %s: %s", slug, exc)
"""Return captured timestamps for a product slug, oldest first (pre-2022 window)."""
rows = _cdx_get({
"url": f"{PC_HOST}/api/v1/products/{slug}",
"output": "json",
"fl": "timestamp,statuscode",
"filter": "statuscode:200",
"limit": "20",
# Pre-HelloFresh-acquisition captures (2019-2021) are most likely
# to have full instructions — API stripped them post-acquisition.
"from": "20190101",
"to": "20211231",
})
if len(rows) < 2:
return []
return [row[0] for row in rows[1:]] # timestamps only, oldest first
def _wayback_json(url: str, timestamp: str) -> Any | None:
@ -172,6 +184,9 @@ def _extract_from_api(data: dict) -> dict | None:
description = sku.get("description") or ""
images = sku.get("hero_images") or sku.get("image_versions") or []
# hero_images can be a list OR a dict keyed by size string — normalise to list
if isinstance(images, dict):
images = list(images.values())
image_url = ""
if images and isinstance(images[0], dict):
image_url = images[0].get("image_url") or images[0].get("url") or ""
@ -319,23 +334,14 @@ def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None:
# HTML fallback when API has no steps/ingredients
if not recipe or not recipe.get("has_full_recipe"):
html_cdx_url = f"{PC_HOST}/recipe/{slug}"
try:
html_resp = requests.get(
CDX_BASE,
params={
"url": html_cdx_url,
"output": "json",
"fl": "timestamp,statuscode",
"filter": "statuscode:200",
"limit": "5",
},
timeout=20,
)
html_ts_rows = html_resp.json() if html_resp.ok else []
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
except Exception:
html_timestamps = []
html_ts_rows = _cdx_get({
"url": f"{PC_HOST}/recipe/{slug}",
"output": "json",
"fl": "timestamp,statuscode",
"filter": "statuscode:200",
"limit": "10",
})
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
time.sleep(CDX_DELAY)
for ts in html_timestamps:
@ -522,6 +528,9 @@ def main() -> None:
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
from scripts.pipeline.log_utils import attach_pipeline_log
attach_pipeline_log("scrape_recipes")
scrape(args.slugs, args.out, resume=args.resume)