Compare commits

..

12 commits
v0.9.2 ... main

Author SHA1 Message Date
cf807179f5 docs: add LLM development disclosure to README
Some checks failed
CI / Backend (Python) (push) Has been cancelled
CI / Frontend (Vue) (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
Humans own design, architecture, code review, testing, and
verification. LLMs are part of our development workflow.
Links to circuitforge.tech/positions for our full position.
2026-05-28 08:20:17 -07:00
0c200f3148 feat(pipeline): ingest_purplecarrot.py — upsert scraped recipes into corpus DB
Some checks failed
CI / Backend (Python) (push) Has been cancelled
CI / Frontend (Vue) (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
- Maps Purple Carrot parquet columns to recipes table schema:
  Slug → external_id (pc_<slug>), Name → title,
  RecipeIngredientParts/RecipeInstructions → ingredients/directions
- Sets source='purplecarrot', category='meal-kit', servings=2
- Allergens encoded as allergen:<tag> keywords alongside HIGH-PROTEIN etc.
- Handles numpy ndarray columns from parquet (not plain Python lists)
- Upserts: insert new, update existing — safe to run repeatedly

Wire step 3 (ingest) into weekly_harvest.sh so the full pipeline is:
  1. discover_current_menu.py → parquet of active menu slugs
  2. scrape_live.py --resume  → scrape only new slugs, append to live parquet
  3. ingest_purplecarrot.py   → upsert into /Library/Assets/kiwi/kiwi.db
2026-05-21 16:43:23 -07:00
21a0664961 feat(pipeline): weekly Purple Carrot harvest script + cron
Some checks are pending
CI / Backend (Python) (push) Waiting to run
CI / Frontend (Vue) (push) Waiting to run
Mirror / mirror (push) Waiting to run
Add weekly_harvest.sh wrapper that:
- Runs discover_current_menu.py to fetch this week's 23 active menu slugs
- Runs scrape_live.py with --resume to scrape only new slugs
- Appends timestamped output to /Library/Assets/kiwi/pipeline/logs/

Cron entry added to system crontab:
  0 23 * * 0 (every Sunday 23:00)
Logs: /Library/Assets/kiwi/pipeline/logs/purple_carrot_harvest.log
2026-05-21 16:22:26 -07:00
a9ab996bcc feat(pipeline): purple carrot weekly menu scraper with CF bypass
Some checks are pending
CI / Backend (Python) (push) Waiting to run
CI / Frontend (Vue) (push) Waiting to run
Mirror / mirror (push) Waiting to run
Add three new scripts for Purple Carrot recipe pipeline:

- discover_current_menu.py: fetches this week's active menu slugs from
  /plant-based-recipes using requests (server-rendered HTML, no JS needed).
  Accumulates slugs across weekly runs for building a recipe corpus over time.

- discover_slugs_categories.py: crawls recipe-category listing pages with
  ?page=N pagination to discover historical slug inventory. Note: category
  archive slugs (past menu items) 404 when scraped live; only use for
  identifying currently-featured recipes per category.

- scrape_live.py: updated with --slugs-from flag (load slug inventory from
  any parquet, not just the default Wayback one) and fresh-context-per-slug
  pattern to bypass Cloudflare session-level bot detection (which fires on
  the 2nd+ request in a shared browser context).

Discovery: the live site only renders full ingredient/instruction content for
recipes currently on the active weekly menu. 23/23 current menu recipes
scraped successfully (100% hit rate vs ~1% for archived slugs).
2026-05-21 16:16:32 -07:00
56f942b3fd feat(pipeline): Purple Carrot scraper hardening + shared pipeline logging
scrape_recipes.py:
- Switch CDX to HTTPS (avoids HTTP 503 rate-limit bucket)
- Restrict product API CDX to 2019–2021 window (pre-HelloFresh instruction stripping)
- Replace inline CDX requests with _cdx_get() helper: retries on 429/503 with
  exponential backoff (15s, 30s, 60s, 120s)
- Increase HTML fallback CDX limit from 5 to 10 timestamps
- Bump CDX_DELAY 0.5s → 3.0s and REPLAY_DELAY 1.2s → 2.0s (polite scraping)
- Fix KeyError: 0 on hero_images dict (normalise dict to list before indexing)

discover_wayback.py:
- Switch CDX to HTTPS

scripts/pipeline/log_utils.py (new):
- attach_pipeline_log(script_name): adds a JSON FileHandler to the root logger
  writing to /Library/Assets/logs/pipeline/<script>_<ts>.jsonl for Avocet
  Turnstone training data ingestion (kiwi#141 / avocet#67)
2026-05-17 13:35:35 -07:00
84636bcdaf docs: bump version badge to match latest Forgejo release
Some checks failed
CI / Backend (Python) (push) Has been cancelled
CI / Frontend (Vue) (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
2026-05-17 11:19:12 -07:00
51a48a430b feat(config): add GPU_SERVER_URL alias for CF_ORCH_URL
Some checks failed
CI / Backend (Python) (push) Waiting to run
CI / Frontend (Vue) (push) Waiting to run
Mirror / mirror (push) Has been cancelled
Release / release (push) Has been cancelled
Self-hoster-friendly env var name. Priority: GPU_SERVER_URL →
CF_ORCH_URL (compat) → https://orch.circuitforge.tech when
CF_LICENSE_KEY is present (Paid+ auto-default). Resolved value
written back to os.environ["CF_ORCH_URL"] at startup so all
service callers remain unchanged.

Bump version to 0.10.0.
2026-05-17 09:42:48 -07:00
b326d4aa6e fix(config): add CF_ORCH_URL to local env for recipe scan + LLM features
Without CF_ORCH_URL set, _call_vision_backend() skips cf-orch entirely
and falls through to local VLM (no GPU in container) then fails.
.env gets CF_ORCH_URL=http://10.1.10.71:7700 for the local rack.
.env.example updated with documentation for self-hosters.

Local scan confirmed: cf-docuvision (Sif, GGUF) → ollama llama3.1:8b → 200 OK.
2026-05-17 09:21:33 -07:00
7cad503b35 feat(pipeline): Purple Carrot recipe corpus scraper via Wayback Machine
discover_wayback.py — enumerates recipe slugs from archived menu API
  (/api/v2/menus/<id>) and product API (/api/v1/products/*) plus
  recipe-category HTML pages. Writes incremental JSONL manifest to
  /Library/Assets/kiwi/pipeline/pc_slugs.jsonl.

scrape_recipes.py — fetches full recipe data per slug using three-tier
  fallback: product API JSON (oldest captures first), HTML inline state
  (__NEXT_DATA__ / __INITIAL_STATE__), and JSON-LD structured data.
  Outputs recipes_purplecarrot.parquet in food.com columnar format so
  build_recipe_index.py imports it unchanged. Includes SourceURL column
  for recipe attribution UI (kiwi#139). Checkpoints every 50 recipes.

Initial discovery: 158 slugs from menu 1536 + product_api pass.
Re-run discover_wayback.py after archive.org stabilizes to pick up
older slugs from recipe-category pages.

Backlog: live Playwright scraper for post-Wayback recipes (kiwi#137).
2026-05-17 09:16:35 -07:00
430600c1af fix(recipe_scan): harden JSON parser for real-world LLM output quirks
- Strip <think>/<thinking> blocks before parsing (Qwen3/DeepSeek-R1 emit
  these before the actual JSON answer)
- Replace greedy regex with brace-balanced _extract_json_object() so
  trailing prose after } doesn't corrupt the extract
- Use non-greedy fence regex to pull JSON from inside ```json blocks
- Pass system= to LLMRouter.complete() with a terse JSON-only instruction
  so Ollama models receive it as a system message, not buried in the user turn
- Add logger.warning() on parse failure so raw output is diagnosable
2026-05-17 08:30:55 -07:00
21a9b85067 fix(recipe_scan): revert to cf-docuvision path (GGUF backend now works)
Route recipe_scan back through task_allocate -> cf-docuvision -> DocuvisionClient
now that docuvision supports GGUF models via Qwen25VLChatHandler.

Two-step pipeline: docuvision OCRs image(s), LLMRouter structures OCR text to JSON.
Removes the non-functional cf-text image_url path (cf-text rejects content arrays).
2026-05-16 19:25:01 -07:00
c72b4415db feat(recipe_scan): use Qwen2-VL GGUF via cf-text OpenAI-compat API
Replace two-step docuvision OCR + LLM structuring pipeline with a
single multimodal VLM call. The bartowski Qwen2-VL-7B-Instruct Q5_K_M
GGUF is served by cf-text (llama.cpp) and accepts image_url content
blocks identical to the OpenAI vision API format.

Removes docuvision dependency for recipe scanning; the addict-missing /
DeepseekVLV2Processor-missing cf-docuvision error no longer blocks scans.
Receipt OCR (kiwi.ocr task) still routes to cf-docuvision separately.
2026-05-16 18:38:21 -07:00
14 changed files with 1878 additions and 31 deletions

View file

@ -21,10 +21,12 @@ DATA_DIR=./data
# IP this machine advertises to the coordinator (must be reachable from coordinator host)
# CF_ORCH_ADVERTISE_HOST=10.1.10.71
# CF-core hosted coordinator (managed cloud GPU inference — Paid+ tier)
# Set CF_ORCH_URL to use a hosted cf-orch coordinator instead of self-hosting.
# CF_LICENSE_KEY is read automatically by CFOrchClient for bearer auth.
# CF_ORCH_URL=https://orch.circuitforge.tech
# GPU inference server (cf-orch coordinator for recipe scan, LLM generation, etc.)
# GPU_SERVER_URL: set to your local cf-orch coordinator (self-hosted rack).
# CF_ORCH_URL is the backward-compat alias — both are honoured.
# Paid+ default: when CF_LICENSE_KEY is present and neither URL is set,
# the app automatically points to https://orch.circuitforge.tech.
# GPU_SERVER_URL=http://10.1.10.71:7700
# CF_LICENSE_KEY=CFG-KIWI-xxxx-xxxx-xxxx
# LLM backend — env-var auto-config (no llm.yaml needed for bare-metal users)
@ -57,6 +59,9 @@ CF_APP_NAME=kiwi
# Unset = auto-detect: true if CLOUD_MODE or circuitforge_orch is installed (paid+ local).
# Set false to force LocalScheduler even when cf-orch is present.
# USE_ORCH_SCHEDULER=false
# GPU_SERVER_URL: cf-orch coordinator endpoint. Required for recipe scan (cf-docuvision)
# and LLM features on a self-hosted rack. CF_ORCH_URL is the backward-compat alias.
# GPU_SERVER_URL=http://10.1.10.71:7700
# Cloud mode (set in compose.cloud.yml; also set here for reference)
# CLOUD_DATA_ROOT=/devl/kiwi-cloud-data

View file

@ -8,7 +8,7 @@
[![License: MIT/BSL](https://img.shields.io/badge/license-MIT%20%2F%20BSL%201.1-blue)](#license)
[![CI](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/badges/workflows/ci.yml/badge.svg)](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/actions)
[![Version](https://img.shields.io/badge/version-0.5.0--beta-green)](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/releases)
[![Version](https://img.shields.io/badge/version-0.6.0-green)](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/releases)
[Documentation](https://docs.circuitforge.tech/kiwi) · [Live demo](https://menagerie.circuitforge.tech/kiwi) · [circuitforge.tech](https://circuitforge.tech)
@ -113,4 +113,6 @@ Kiwi uses a split license:
- **Discovery and inventory pipeline** (barcode scan, expiry tracking, pantry CRUD, CSV export, recipe browser): [MIT](LICENSE-MIT)
- **AI features** (receipt OCR, LLM recipe suggestions, style auto-classifier): [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use or SaaS re-hosting requires a paid license. Converts to MIT after 4 years.
Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)
Privacy · Safety · Accessibility — co-equal, non-negotiable across all CircuitForge products.

View file

@ -65,9 +65,24 @@ class Settings:
# Quality
MIN_QUALITY_SCORE: float = float(os.environ.get("MIN_QUALITY_SCORE", "50.0"))
# CF-core resource coordinator (VRAM lease management)
# CF-core resource coordinator (VRAM lease management — lease broker, not inference)
COORDINATOR_URL: str = os.environ.get("COORDINATOR_URL", "http://localhost:7700")
# GPU inference server URL
# Priority: GPU_SERVER_URL env var → CF_ORCH_URL env var (backward compat)
# → https://orch.circuitforge.tech when CF_LICENSE_KEY is present (Paid+)
# Resolved value is written back to os.environ["CF_ORCH_URL"] at startup so
# all service-layer callers that read CF_ORCH_URL directly see the right URL.
GPU_SERVER_URL: str | None = (
os.environ.get("GPU_SERVER_URL")
or os.environ.get("CF_ORCH_URL")
or (
"https://orch.circuitforge.tech"
if os.environ.get("CF_LICENSE_KEY")
else None
)
)
# Hosted cf-orch coordinator — bearer token for managed cloud GPU inference (Paid+)
# CFOrchClient reads CF_LICENSE_KEY automatically; exposed here for startup validation.
CF_LICENSE_KEY: str | None = os.environ.get("CF_LICENSE_KEY")
@ -108,3 +123,9 @@ class Settings:
settings = Settings()
# Normalise GPU_SERVER_URL into CF_ORCH_URL so every service-layer caller that
# reads os.environ.get("CF_ORCH_URL") sees the resolved value, including the
# Paid+ cloud default injected above.
if settings.GPU_SERVER_URL:
os.environ["CF_ORCH_URL"] = settings.GPU_SERVER_URL

View file

@ -215,6 +215,35 @@ def _build_ocr_extraction_prompt(ocr_text: str) -> str:
)
def _call_via_cf_text_vlm(alloc_url: str, image_paths: list[Path], prompt: str) -> str:
"""Call the cf-text OpenAI-compat API with images via the llama.cpp multimodal backend."""
import httpx
content: list[dict] = []
for i, path in enumerate(image_paths):
if i > 0:
content.append({"type": "text", "text": f"(Page {i + 1} of the same recipe:)"})
b64 = _load_image_b64(path)
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
})
content.append({"type": "text", "text": prompt})
resp = httpx.post(
f"{alloc_url.rstrip('/')}/v1/chat/completions",
json={
"model": "local",
"messages": [{"role": "user", "content": content}],
"max_tokens": 2048,
"temperature": 0.0,
},
timeout=180.0,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"].strip()
def _call_vision_backend(
image_paths: list[Path],
prompt: str,
@ -222,7 +251,7 @@ def _call_vision_backend(
) -> str:
"""Dispatch to the best available vision backend.
Priority: cf-orch docuvision (OCR + text LLM) -> local Qwen2.5-VL -> Anthropic API.
Priority: cf-orch (Qwen2-VL GGUF via cf-text) -> local Qwen2.5-VL -> Anthropic API.
Raises RuntimeError with a clear message when no backend is available.
Args:
@ -237,9 +266,8 @@ def _call_vision_backend(
errors: list[str] = []
# 1. Try cf-orch task allocation → cf-docuvision OCR, then text LLM structuring.
# Two-step: docuvision extracts text from the image(s), then LLMRouter
# converts the OCR text to structured recipe JSON using the extraction prompt.
# 1. Try cf-orch task allocation → cf-docuvision (Qwen2-VL GGUF via llama.cpp).
# Two-step: docuvision OCRs the image(s), then LLMRouter structures the text into JSON.
cf_orch_url = os.environ.get("CF_ORCH_URL")
if cf_orch_url:
try:
@ -250,7 +278,6 @@ def _call_vision_backend(
try:
_progress("allocating", "Starting vision service...")
with task_allocate("kiwi", "recipe_scan", service_hint="cf-docuvision", ttl_s=120.0) as alloc:
# Step 1: OCR each image via cf-docuvision
_progress("scanning", "Extracting recipe text from photo...")
doc_client = DocuvisionClient(alloc.url)
ocr_parts: list[str] = []
@ -263,9 +290,11 @@ def _call_vision_backend(
if not combined_ocr.strip():
raise ValueError("Docuvision returned no text — image may not be a recipe")
# Step 2: Text LLM structures OCR output into recipe JSON
_progress("structuring", "Parsing recipe structure...")
text = LLMRouter().complete(_build_ocr_extraction_prompt(combined_ocr))
text = LLMRouter().complete(
_build_ocr_extraction_prompt(combined_ocr),
system="You are a recipe data extractor. Return ONLY valid JSON. No markdown, no explanation, no code fences.",
)
if text:
return text
@ -303,40 +332,76 @@ def _normalize_ingredient_name(name: str) -> str:
return name.lower().strip()
def _extract_json_object(text: str) -> str | None:
"""Return the first balanced JSON object from text, or None if not found.
Uses brace-counting rather than a greedy regex so trailing prose and
nested objects are handled correctly.
"""
start = text.find("{")
if start == -1:
return None
depth = 0
in_string = False
escape_next = False
for i, ch in enumerate(text[start:], start):
if escape_next:
escape_next = False
continue
if ch == "\\" and in_string:
escape_next = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return text[start : i + 1]
return None
def _parse_scanner_json(raw_text: str) -> dict:
"""Extract and return the JSON dict from VLM output.
Handles:
- Pure JSON
- JSON wrapped in ```json ... ``` markdown fences
- JSON preceded by a line of prose ("Here is the recipe: {...}")
- JSON in ```json ... ``` markdown fences
- Qwen3-style <think>...</think> or <thinking>...</thinking> preambles
- JSON preceded or followed by prose
Raises ValueError on not_a_recipe or unparseable output.
"""
text = raw_text.strip()
# Strip markdown fences if present
if text.startswith("```"):
parts = text.split("```")
for part in parts:
part = part.strip()
if part.startswith("json"):
part = part[4:].strip()
if part.startswith("{"):
text = part
break
# Strip thinking-token blocks emitted by reasoning models (Qwen3, DeepSeek-R1, etc.)
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE).strip()
text = re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL | re.IGNORECASE).strip()
# Try direct parse first
# Strip markdown fences if present
if "```" in text:
# Find the content between the first ``` pair
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if fence_match:
text = fence_match.group(1).strip()
# Try direct parse
try:
data = json.loads(text)
except json.JSONDecodeError:
# Extract first JSON object embedded in prose
match = re.search(r"\{.*\}", text, re.DOTALL)
if not match:
# Fall back to brace-balanced extraction from anywhere in the output
candidate = _extract_json_object(text)
if not candidate:
logger.warning("Could not parse JSON from LLM output (first 400 chars): %r", text[:400])
raise ValueError(f"Could not parse JSON from VLM output: {text[:200]!r}")
try:
data = json.loads(match.group(0))
data = json.loads(candidate)
except json.JSONDecodeError as exc:
logger.warning("Brace-extracted JSON still invalid: %r", candidate[:400])
raise ValueError(f"Could not parse JSON from VLM output: {exc}") from exc
if isinstance(data, dict) and data.get("error") == "not_a_recipe":

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "kiwi"
version = "0.6.0"
version = "0.10.0"
description = "Pantry tracking + leftover recipe suggestions"
readme = "README.md"
requires-python = ">=3.11"

View file

@ -0,0 +1,218 @@
"""Ingest Purple Carrot scraped recipes into the Kiwi corpus database.
Reads recipes_purplecarrot_live.parquet (output of scrape_live.py) and
upserts into the shared recipes table, setting source='purplecarrot' and
using the recipe slug as the external_id (prefixed pc_).
Run after each weekly_harvest.sh scrape:
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
[--db /Library/Assets/kiwi/kiwi.db] \
[--parquet /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet]
"""
from __future__ import annotations
import argparse
import json
import sqlite3
from pathlib import Path
import math
import re
import pandas as pd
# ── Helpers (inlined from build_recipe_index to avoid cross-module import) ─────
_MEASURE_PATTERN = re.compile(
r"^\d[\d\s/¼½¾⅓⅔]*\s*(cup|tbsp|tsp|oz|lb|g|kg|ml|l|clove|slice|piece|can|pkg|package|bunch|head|stalk|sprig|pinch|dash|to taste|as needed)s?\b",
re.IGNORECASE,
)
_LEAD_NUMBER = re.compile(r"^\d[\d\s/¼½¾⅓⅔]*\s*")
_TRAILING_QUALIFIER = re.compile(
r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$",
re.IGNORECASE,
)
def _float_or_none(val: object) -> float | None:
try:
v = float(val) # type: ignore[arg-type]
return v if v > 0 else None
except (TypeError, ValueError):
return None
def _safe_list(val: object) -> list:
if val is None:
return []
if isinstance(val, float) and math.isnan(val):
return []
if isinstance(val, list):
return val
# Parquet often deserializes list columns as numpy arrays
try:
import numpy as np
if isinstance(val, np.ndarray):
return val.tolist()
except ImportError:
pass
return []
def _extract_ingredient_names(raw_list: list[str]) -> list[str]:
names = []
for raw in raw_list:
s = raw.lower().strip()
s = _MEASURE_PATTERN.sub("", s)
s = _LEAD_NUMBER.sub("", s)
s = re.sub(r"\(.*?\)", "", s)
s = re.sub(r",.*$", "", s)
s = _TRAILING_QUALIFIER.sub("", s)
s = s.strip(" -.,")
if s and len(s) > 1:
names.append(s)
return names
def _compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
counts: dict[str, int] = {}
for p in profiles:
for elem in p.get("elements", []):
counts[elem] = counts.get(elem, 0) + 1
if not profiles:
return {}
return {e: round(c / len(profiles), 3) for e, c in counts.items()}
# ── Config ─────────────────────────────────────────────────────────────────────
DEFAULT_DB = Path("/Library/Assets/kiwi/kiwi.db")
DEFAULT_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
# ── Ingest ─────────────────────────────────────────────────────────────────────
def ingest(db_path: Path, parquet_path: Path) -> None:
df = pd.read_parquet(parquet_path)
# Filter to rows with full recipe data
if "HasFullRecipe" in df.columns:
df = df[df["HasFullRecipe"] == True].copy()
if df.empty:
print("No full recipes found in parquet — nothing to ingest.")
return
print(f"Ingesting {len(df)} Purple Carrot recipes into {db_path}")
conn = sqlite3.connect(db_path)
try:
conn.execute("PRAGMA journal_mode=WAL")
# Pre-load ingredient element profiles for coverage calculation
profile_index: dict[str, list[str]] = {}
for row in conn.execute("SELECT name, elements FROM ingredient_profiles"):
try:
profile_index[row[0]] = json.loads(row[1])
except Exception:
pass
inserted = updated = 0
for _, row in df.iterrows():
slug = str(row.get("Slug", "")).strip()
if not slug:
continue
external_id = f"pc_{slug}"
title = str(row.get("Name", "")).strip()[:500]
if not title:
continue
raw_ingredients = [str(i) for i in _safe_list(row.get("RecipeIngredientParts", []))]
directions = [str(d) for d in _safe_list(row.get("RecipeInstructions", []))]
ingredient_names = _extract_ingredient_names(raw_ingredients)
profiles = [
{"elements": profile_index[n]}
for n in ingredient_names if n in profile_index
]
coverage = _compute_element_coverage(profiles)
# Keywords: merge scraped tags with allergen info
kw_raw = _safe_list(row.get("Keywords", []))
allergens = str(row.get("Allergens", "") or "")
if allergens:
kw_raw = list(kw_raw) + [f"allergen:{a.strip()}" for a in allergens.split(",") if a.strip()]
keywords_json = json.dumps(kw_raw)
# Check if already present (same external_id)
existing = conn.execute(
"SELECT id FROM recipes WHERE external_id = ?", (external_id,)
).fetchone()
params = (
title,
json.dumps(raw_ingredients),
json.dumps(ingredient_names),
json.dumps(directions),
"meal-kit", # category
keywords_json,
_float_or_none(row.get("Calories")),
_float_or_none(row.get("FatContent")),
_float_or_none(row.get("ProteinContent")),
None, # sodium_mg — not scraped
json.dumps(coverage),
None, # sugar_g — not scraped
_float_or_none(row.get("CarbohydrateContent")),
_float_or_none(row.get("FiberContent")),
2.0, # servings — PC meal kits are 2-serving by default
0, # nutrition_estimated — PC provides real data
)
if existing:
conn.execute("""
UPDATE recipes
SET title=?, ingredients=?, ingredient_names=?, directions=?,
category=?, keywords=?, calories=?, fat_g=?, protein_g=?,
sodium_mg=?, element_coverage=?,
sugar_g=?, carbs_g=?, fiber_g=?, servings=?, nutrition_estimated=?
WHERE external_id=?
""", params + (external_id,))
updated += 1
else:
conn.execute("""
INSERT INTO recipes
(external_id, source, title, ingredients, ingredient_names,
directions, category, keywords, calories, fat_g, protein_g,
sodium_mg, element_coverage,
sugar_g, carbs_g, fiber_g, servings, nutrition_estimated)
VALUES (?, 'purplecarrot', ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (external_id,) + params)
inserted += 1
conn.commit()
finally:
conn.close()
print(f"Done — {inserted} inserted, {updated} updated")
# ── Main ───────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
parser.add_argument("--parquet", type=Path, default=DEFAULT_PARQUET)
args = parser.parse_args()
if not args.parquet.exists():
print(f"ERROR: parquet not found at {args.parquet}")
raise SystemExit(1)
ingest(args.db, args.parquet)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,68 @@
"""
Pipeline logging utility.
Adds a structured JSON FileHandler to the root logger so every pipeline
script automatically writes machine-readable logs to the shared datastore
at /Library/Assets/logs/pipeline/. Avocet ingests these for Turnstone
logreading training (kiwi#141 / avocet#67).
Usage (add near the top of main() after logging.basicConfig):
from scripts.pipeline.log_utils import attach_pipeline_log
attach_pipeline_log("scrape_recipes")
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
PIPELINE_LOG_DIR = Path(
os.environ.get("PIPELINE_LOG_DIR", "/Library/Assets/logs/pipeline")
)
class _JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload: dict = {
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
if record.exc_info:
payload["exc"] = self.formatException(record.exc_info)
# Any extra kwargs passed via logger.info("...", extra={...})
standard = {
"name", "msg", "args", "levelname", "levelno", "pathname",
"filename", "module", "exc_info", "exc_text", "stack_info",
"lineno", "funcName", "created", "msecs", "relativeCreated",
"thread", "threadName", "processName", "process", "message",
"taskName",
}
extra = {k: v for k, v in record.__dict__.items() if k not in standard}
if extra:
payload["extra"] = extra
return json.dumps(payload)
def attach_pipeline_log(script_name: str) -> Path:
"""Attach a JSON file handler to the root logger for pipeline logging.
Returns the path of the log file created.
"""
PIPELINE_LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%S")
log_path = PIPELINE_LOG_DIR / f"{script_name}_{ts}.jsonl"
handler = logging.FileHandler(log_path, encoding="utf-8")
handler.setLevel(logging.DEBUG)
handler.setFormatter(_JsonFormatter())
logging.getLogger().addHandler(handler)
logging.getLogger(__name__).info(
"Pipeline log: %s", log_path, extra={"script": script_name}
)
return log_path

View file

@ -0,0 +1,120 @@
"""Discover Purple Carrot's current weekly menu recipe slugs.
The main /plant-based-recipes listing page always renders the current week's
menu as server-side HTML. This script pulls those slugs and writes them to a
parquet that can be passed directly to scrape_live.py via --slugs-from.
Run weekly (e.g. via cron) to accumulate new recipes as the menu rotates.
Usage:
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_current_menu.py \
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet]
Then scrape:
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
--slugs-from /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet \
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \
--resume
"""
from __future__ import annotations
import re
import sys
from datetime import date
from pathlib import Path
import pandas as pd
import requests
from bs4 import BeautifulSoup
# ── Config ─────────────────────────────────────────────────────────────────────
LISTING_URL = "https://www.purplecarrot.com/plant-based-recipes"
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet")
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
RECIPE_HREF_RE = re.compile(r"/recipe/([^?#]+)")
# ── Main ───────────────────────────────────────────────────────────────────────
def discover_current_slugs() -> list[str]:
"""Fetch the listing page and return unique recipe slugs from the current menu."""
resp = requests.get(LISTING_URL, headers=HEADERS, timeout=15)
if resp.status_code != 200:
print(f"ERROR: listing page returned HTTP {resp.status_code}", file=sys.stderr)
return []
soup = BeautifulSoup(resp.text, "html.parser")
slugs: list[str] = []
seen: set[str] = set()
for a in soup.find_all("a", href=RECIPE_HREF_RE):
m = RECIPE_HREF_RE.search(a["href"])
if m:
slug = m.group(1)
if slug not in seen:
seen.add(slug)
slugs.append(slug)
return slugs
def main() -> None:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
args = parser.parse_args()
print(f"Fetching current menu from {LISTING_URL}")
slugs = discover_current_slugs()
if not slugs:
print("No slugs found — the listing page may have changed structure or blocked the request.")
sys.exit(1)
today = date.today().isoformat()
records = [
{
"Slug": slug,
"SourceURL": BASE_URL.format(slug=slug),
"Source": "purplecarrot_menu",
"DiscoveredDate": today,
}
for slug in slugs
]
# Merge with any existing menu parquet (accumulate weeks)
df_new = pd.DataFrame(records)
args.out.parent.mkdir(parents=True, exist_ok=True)
if args.out.exists():
df_prev = pd.read_parquet(args.out)
combined = pd.concat([df_prev, df_new], ignore_index=True)
combined = combined.drop_duplicates(subset=["Slug"], keep="first")
df_new = combined
df_new.to_parquet(args.out, index=False)
print(f"Found {len(slugs)} current-menu slugs this week:")
for s in slugs:
print(f" {s}")
print(f"\nSaved {len(df_new)} total slugs (accumulated) to {args.out}")
print(f"\nTo scrape full recipes:")
print(f" conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \\")
print(f" --slugs-from {args.out} \\")
print(f" --out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \\")
print(f" --resume")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,218 @@
"""Discover Purple Carrot recipe slugs by crawling all recipe-category listing pages.
The site serves full server-rendered HTML for category pages, paginated via
?page=N. Each page loads 18 recipe cards. This script crawls every category
across all pages and writes a deduplicated slug inventory.
Usage:
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_slugs_categories.py \
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet] \
[--delay 2.0] \
[--max-pages 50] # safety cap per category (comfort-foods has ~18)
"""
from __future__ import annotations
import argparse
import re
import time
from pathlib import Path
from typing import Any
import pandas as pd
import requests
from bs4 import BeautifulSoup
# ── Config ─────────────────────────────────────────────────────────────────────
BASE = "https://www.purplecarrot.com"
# All known category slugs (from /plant-based-recipes nav)
CATEGORIES: list[str] = [
"comfort-foods",
"family-friendly",
"healthy-desserts",
"holiday-recipes",
"quick-and-easy",
"party-foods",
"seasonal-menu",
"spring-recipes",
"summer-recipes",
"fall-recipes",
"winter-recipes",
"african",
"american",
"asian",
"comfort",
"french",
"indian",
"italian",
"mediterranean",
"mexican",
"middle-eastern",
"soups",
"salads",
"bowls",
"pasta",
"sandwiches-wraps",
"tacos",
"breakfast",
"snacks-sides",
]
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet")
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
RECIPE_LINK_SELECTOR = "a.c-recipe__title"
SLUG_RE = re.compile(r"/recipe/([^?#]+)")
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
# ── Helpers ────────────────────────────────────────────────────────────────────
def _fetch_html(url: str, session: requests.Session) -> str | None:
"""Fetch URL and return HTML string, or None on failure."""
try:
resp = session.get(url, headers=HEADERS, timeout=15)
if resp.status_code == 200:
return resp.text
if resp.status_code == 404:
return None # expected end of pagination
print(f" HTTP {resp.status_code}{url}")
return None
except Exception as exc:
print(f" ERROR fetching {url}: {exc}")
return None
def _extract_slugs(html: str) -> list[str]:
"""Pull recipe slugs from one listing-page HTML response."""
soup = BeautifulSoup(html, "html.parser")
slugs: list[str] = []
for a in soup.select(RECIPE_LINK_SELECTOR):
href = a.get("href", "")
m = SLUG_RE.search(href)
if m:
slugs.append(m.group(1))
return slugs
def _get_category_total(html: str) -> int | None:
"""Try to parse the recipe count shown on the category page (e.g. '319 Recipes')."""
m = re.search(r"(\d+)\s+Recipes?\b", html)
return int(m.group(1)) if m else None
def _discover_category(
category: str,
session: requests.Session,
delay: float,
max_pages: int,
) -> tuple[list[str], int]:
"""Crawl all pages of a category, return (slugs, pages_fetched)."""
slugs: list[str] = []
for page_num in range(1, max_pages + 1):
if page_num == 1:
url = f"{BASE}/recipe-categories/{category}"
else:
url = f"{BASE}/recipe-categories/{category}?page={page_num}"
html = _fetch_html(url, session)
if html is None:
break # 404 or error = past the end
page_slugs = _extract_slugs(html)
if not page_slugs:
# Show total if we got a page but no links (category slug may be wrong)
if page_num == 1:
total = _get_category_total(html)
if total is not None:
print(f" page 1 loaded (total={total}) but 0 recipe links — selector may need updating")
break
slugs.extend(page_slugs)
# Print progress
total_hint = _get_category_total(html) if page_num == 1 else None
total_str = f" / {total_hint}" if total_hint else ""
print(f" page {page_num}: +{len(page_slugs)} slugs ({len(slugs)}{total_str} cumulative)")
if len(page_slugs) < 18:
# Short page = last page
break
time.sleep(delay)
return slugs, (len(slugs) + 17) // 18 # approximate pages
# ── Main ───────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
parser.add_argument("--delay", type=float, default=2.0,
help="Seconds between page requests")
parser.add_argument("--max-pages", type=int, default=50,
help="Safety cap on pages per category")
parser.add_argument("--categories", nargs="*",
help="Crawl only these category slugs (default: all)")
args = parser.parse_args()
categories = args.categories or CATEGORIES
# Seed with any slugs from the Wayback parquet
known_slugs: set[str] = set()
if EXISTING_PARQUET.exists():
df_wb = pd.read_parquet(EXISTING_PARQUET)
known_slugs = set(df_wb["Slug"].dropna().tolist())
print(f"Seeded with {len(known_slugs)} slugs from Wayback parquet")
all_records: list[dict[str, Any]] = []
session = requests.Session()
for category in categories:
print(f"\n[{category}]")
cat_slugs, pages = _discover_category(category, session, args.delay, args.max_pages)
for slug in cat_slugs:
all_records.append({"Slug": slug, "Category": category, "Source": "purplecarrot_category"})
print(f"{len(cat_slugs)} slugs across ~{pages} pages")
time.sleep(args.delay)
if not all_records:
print("\nNo records found — check that categories are correct and the site is accessible")
return
# Deduplicate keeping first category encountered
df_new = pd.DataFrame(all_records)
df_new = df_new.drop_duplicates(subset=["Slug"], keep="first")
# Also include Wayback slugs not already in the new set
if known_slugs:
wb_only = known_slugs - set(df_new["Slug"].tolist())
if wb_only:
df_wb_extra = pd.DataFrame([
{"Slug": s, "Category": "wayback", "Source": "purplecarrot_wayback"}
for s in wb_only
])
df_new = pd.concat([df_new, df_wb_extra], ignore_index=True)
args.out.parent.mkdir(parents=True, exist_ok=True)
df_new.to_parquet(args.out, index=False)
new_count = len(df_new)
cat_count = len(df_new[df_new["Source"] == "purplecarrot_category"])
print(f"\nDone — {new_count} total slugs saved to {args.out}")
print(f" {cat_count} from category pages, {new_count - cat_count} from Wayback only")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,301 @@
"""
discover_wayback.py enumerate Purple Carrot recipe slugs via the Wayback Machine.
Strategy:
1. CDX API all archived /api/v2/menus/* URLs (multiple timestamps)
2. Replay fetch each menu's menuItems, extract productPath slugs
3. CDX API all archived /api/v1/products/* URLs (direct slug capture)
4. CDX API /recipe-categories/* HTML pages for older slugs
5. Deduplicate and write manifest to OUT_FILE
Output (JSONL, one record per recipe):
{"slug": "...", "title": "...", "subtitle": "...", "cook_time": "...",
"tags": [...], "serving_size": 2, "image_url": "...",
"wayback_ts": "20260412150557", "source": "menu|product_api|category_page"}
Usage:
conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback
conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback --out /Library/Assets/kiwi/pipeline/pc_slugs.jsonl
"""
from __future__ import annotations
import argparse
import json
import logging
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlencode
import requests
logger = logging.getLogger(__name__)
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
WB_BASE = "https://web.archive.org/web"
PC_HOST = "www.purplecarrot.com"
# Polite delay between Wayback replay fetches (seconds)
REPLAY_DELAY = 1.0
CDX_DELAY = 0.5
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
# ── CDX helpers ───────────────────────────────────────────────────────────────
def cdx_query(url_pattern: str, **kwargs) -> list[dict]:
"""Run a CDX search and return a list of result dicts."""
params = {
"url": url_pattern,
"output": "json",
"fl": "original,timestamp,statuscode",
"collapse": "urlkey",
"filter": "statuscode:200",
**kwargs,
}
for attempt in range(3):
try:
resp = requests.get(CDX_BASE, params=params, timeout=30)
resp.raise_for_status()
rows = resp.json()
if not rows or len(rows) < 2:
return []
headers = rows[0]
return [dict(zip(headers, row)) for row in rows[1:]]
except Exception as exc:
logger.warning("CDX attempt %d failed: %s", attempt + 1, exc)
time.sleep(2 ** attempt)
return []
def wayback_get(url: str, timestamp: str) -> Any | None:
"""Fetch a Wayback replay of a URL and return parsed JSON (or None)."""
replay_url = f"{WB_BASE}/{timestamp}/{url}"
for attempt in range(3):
try:
resp = requests.get(replay_url, timeout=30)
if resp.status_code == 200:
return resp.json()
if resp.status_code == 404:
return None
except Exception as exc:
logger.warning("Wayback GET attempt %d failed for %s: %s", attempt + 1, url, exc)
time.sleep(2 ** attempt)
return None
# ── Slug extraction ───────────────────────────────────────────────────────────
def slug_from_product_path(path: str) -> str | None:
"""'/recipe/foo-bar-baz''foo-bar-baz'."""
if not path:
return None
return path.strip("/").split("/")[-1] or None
def _menu_item_to_record(item: dict, wayback_ts: str) -> dict | None:
slug = slug_from_product_path(item.get("productPath", ""))
if not slug:
return None
return {
"slug": slug,
"title": item.get("title", ""),
"subtitle": item.get("subtitle", ""),
"cook_time": item.get("cookTime", ""),
"tags": item.get("filterTags") or [],
"serving_size": item.get("servingSize"),
"image_url": item.get("imageURL", ""),
"description": item.get("description", ""),
"wayback_ts": wayback_ts,
"source": "menu",
}
# ── Discovery passes ──────────────────────────────────────────────────────────
def pass_menus(seen_slugs: set[str]) -> list[dict]:
"""Walk all archived /api/v2/menus/* captures to extract slugs."""
records: list[dict] = []
# Find all distinct archived menu URLs
menu_cdx = cdx_query(f"{PC_HOST}/api/v2/menus/*", limit="500")
logger.info("CDX: %d archived menu URLs found", len(menu_cdx))
time.sleep(CDX_DELAY)
processed_menu_ids: set[str] = set()
for entry in menu_cdx:
url = entry["original"]
ts = entry["timestamp"]
# Skip the listing endpoint, only process individual menus
if not url.split("?")[0].rstrip("/").split("/")[-1].isdigit():
continue
menu_id = url.split("?")[0].rstrip("/").split("/")[-1]
if menu_id in processed_menu_ids:
continue
processed_menu_ids.add(menu_id)
logger.info("Fetching menu %s (ts=%s) ...", menu_id, ts)
data = wayback_get(url.split("?")[0] + "?logged_out=true", ts)
time.sleep(REPLAY_DELAY)
if not data or "menuItems" not in data:
continue
for item in data["menuItems"]:
rec = _menu_item_to_record(item, ts)
if rec and rec["slug"] not in seen_slugs:
seen_slugs.add(rec["slug"])
records.append(rec)
logger.debug(" + %s", rec["slug"])
logger.info(" %d new slugs (total so far: %d)", len(records), len(seen_slugs))
return records
def pass_product_api(seen_slugs: set[str]) -> list[dict]:
"""Pick up any directly archived /api/v1/products/* URLs the menu pass missed."""
records: list[dict] = []
product_cdx = cdx_query(f"{PC_HOST}/api/v1/products/*", limit="5000")
logger.info("CDX: %d archived product API URLs found", len(product_cdx))
time.sleep(CDX_DELAY)
for entry in product_cdx:
slug = entry["original"].rstrip("/").split("/")[-1]
if not slug or slug in seen_slugs:
continue
seen_slugs.add(slug)
records.append({
"slug": slug,
"title": "",
"subtitle": "",
"cook_time": "",
"tags": [],
"serving_size": None,
"image_url": "",
"description": "",
"wayback_ts": entry["timestamp"],
"source": "product_api",
})
logger.info("product_api pass: %d new slugs", len(records))
return records
def pass_category_pages(seen_slugs: set[str]) -> list[dict]:
"""Parse archived recipe-categories HTML pages for slugs not in the API.
Category pages are rendered SSR/with inline JSON state on older captures,
so we do a simple regex scan for /recipe/<slug> patterns.
"""
import re
records: list[dict] = []
SLUG_RE = re.compile(r'["\s]/recipe/([a-z0-9][a-z0-9\-]{3,})["\s/?]')
cat_cdx = cdx_query(f"{PC_HOST}/recipe-categories/*", limit="200")
logger.info("CDX: %d archived category pages found", len(cat_cdx))
time.sleep(CDX_DELAY)
seen_category_urls: set[str] = set()
for entry in cat_cdx:
url = entry["original"].split("?")[0]
if url in seen_category_urls:
continue
seen_category_urls.add(url)
replay_url = f"{WB_BASE}/{entry['timestamp']}/{url}"
try:
resp = requests.get(replay_url, timeout=30)
time.sleep(REPLAY_DELAY)
if resp.status_code != 200:
continue
except Exception as exc:
logger.warning("Category page fetch failed: %s", exc)
continue
for slug in SLUG_RE.findall(resp.text):
if slug in seen_slugs:
continue
seen_slugs.add(slug)
records.append({
"slug": slug,
"title": "",
"subtitle": "",
"cook_time": "",
"tags": [],
"serving_size": None,
"image_url": "",
"description": "",
"wayback_ts": entry["timestamp"],
"source": "category_page",
})
logger.info("category_pages pass: %d new slugs", len(records))
return records
# ── Main ──────────────────────────────────────────────────────────────────────
def discover(out_file: Path) -> None:
seen: set[str] = set()
# Load previously discovered slugs so reruns are incremental
existing: list[dict] = []
if out_file.exists():
with open(out_file) as f:
for line in f:
line = line.strip()
if line:
rec = json.loads(line)
seen.add(rec["slug"])
existing.append(rec)
logger.info("Loaded %d existing slugs from %s", len(seen), out_file)
new_records: list[dict] = []
new_records += pass_menus(seen)
new_records += pass_product_api(seen)
new_records += pass_category_pages(seen)
out_file.parent.mkdir(parents=True, exist_ok=True)
with open(out_file, "a") as f:
for rec in new_records:
f.write(json.dumps(rec) + "\n")
total = len(existing) + len(new_records)
logger.info(
"Done. %d new slugs written to %s (%d total).",
len(new_records), out_file, total,
)
def main() -> None:
parser = argparse.ArgumentParser(description="Discover Purple Carrot recipe slugs via Wayback")
parser.add_argument(
"--out",
type=Path,
default=DEFAULT_OUT,
help=f"Output JSONL manifest (default: {DEFAULT_OUT})",
)
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
from scripts.pipeline.log_utils import attach_pipeline_log
attach_pipeline_log("discover_wayback")
discover(args.out)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,250 @@
"""Playwright scraper for live purplecarrot.com recipe pages.
Uses the slug inventory already in recipes_purplecarrot.parquet and fills in
the missing ingredients/instructions by hitting the live site directly.
Usage:
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet] \
[--delay 2.5] \
[--limit 20]
"""
from __future__ import annotations
import argparse
import json
import re
import time
from pathlib import Path
from typing import Any
import pandas as pd
from playwright.sync_api import sync_playwright, Page, TimeoutError as PWTimeout
# ── Config ─────────────────────────────────────────────────────────────────────
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
RENDER_WAIT_MS = 2500 # JS render settle time
NAV_TIMEOUT_MS = 20_000
# ── Page parser ────────────────────────────────────────────────────────────────
def _text(page: Page, selector: str) -> str:
el = page.query_selector(selector)
return el.inner_text().strip() if el else ""
def _texts(page: Page, selector: str) -> list[str]:
return [el.inner_text().strip() for el in page.query_selector_all(selector)]
def _parse_recipe(page: Page, slug: str, source_url: str) -> dict[str, Any] | None:
"""Extract structured recipe data from the rendered page."""
body = page.inner_text("body")
# Abort if we've been bounced to a generic listing / 404
if "Page Not Found" in body or slug not in page.url:
return None
# ── Title ──────────────────────────────────────────────────────────────────
# The <h1> on product pages tends to be the recipe name
title = (_text(page, "h1") or _text(page, "[class*='recipe-title']")).strip()
if not title:
# Fallback: first heading-like text before "Ingredients"
idx = body.find("Ingredients\n")
title = body[:idx].strip().splitlines()[-1] if idx > 0 else ""
# ── Ingredients / Instructions via body text ───────────────────────────────
ing_start = body.find("\nIngredients\n")
inst_start = body.find("\nInstructions\n")
footer_start = body.find("\nShop\n") # footer sentinel
if ing_start == -1:
return None # page didn't render recipe content
raw_ingredients: list[str] = []
raw_instructions: list[str] = []
if ing_start != -1 and inst_start != -1:
ing_block = body[ing_start + len("\nIngredients\n"):inst_start].strip()
raw_ingredients = [l.strip() for l in ing_block.splitlines() if l.strip()]
if inst_start != -1:
end = footer_start if footer_start > inst_start else len(body)
inst_block = body[inst_start + len("\nInstructions\n"):end].strip()
# Steps start with a digit
steps: list[str] = []
current: list[str] = []
for line in inst_block.splitlines():
line = line.strip()
if not line:
continue
if re.match(r"^\d+$", line):
if current:
steps.append(" ".join(current))
current = []
elif line.startswith("CULINARY NOTES"):
break
else:
current.append(line)
if current:
steps.append(" ".join(current))
raw_instructions = steps
# ── Nutrition ──────────────────────────────────────────────────────────────
def _extract_num(pattern: str) -> float | None:
m = re.search(pattern, body)
try:
return float(m.group(1)) if m else None
except ValueError:
return None
cal = _extract_num(r"(\d+)\s*CAL")
fat = _extract_num(r"(\d+(?:\.\d+)?)g\s*FAT")
carbs = _extract_num(r"(\d+(?:\.\d+)?)g\s*CARBS")
prot = _extract_num(r"(\d+(?:\.\d+)?)g\s*PROTEIN")
fiber = _extract_num(r"(\d+(?:\.\d+)?)g\s*FIBER")
# ── Allergens / tags ───────────────────────────────────────────────────────
allergen_m = re.search(r"Allergens?:\s*([^\n]+)", body)
allergens = allergen_m.group(1).strip() if allergen_m else ""
# Feature tags like HIGH-PROTEIN, QUICK, etc. appear before Ingredients
pre_ing = body[:ing_start]
tags = re.findall(r"\b(HIGH-PROTEIN|QUICK|SPICY|LOW[\-\s]CALORIE|VEGAN|FAMILY\s+FRIENDLY)\b", pre_ing)
return {
"Slug": slug,
"Name": title,
"SourceURL": source_url,
"Source": "purplecarrot_live",
"RecipeIngredientParts": raw_ingredients,
"RecipeInstructions": raw_instructions,
"Calories": cal,
"FatContent": fat,
"CarbohydrateContent": carbs,
"ProteinContent": prot,
"FiberContent": fiber,
"Allergens": allergens,
"Keywords": tags,
"HasFullRecipe": bool(raw_ingredients and raw_instructions),
}
# ── Main ───────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
parser.add_argument("--delay", type=float, default=2.5,
help="Seconds between requests (be polite)")
parser.add_argument("--limit", type=int, default=0,
help="Stop after N slugs (0 = all)")
parser.add_argument("--resume", action="store_true",
help="Skip slugs already present in --out")
parser.add_argument("--slugs-from", type=Path, default=None,
help="Read slug inventory from this parquet instead of the default Wayback one")
args = parser.parse_args()
# Load slug inventory — either from a custom parquet or the default Wayback run
slugs_parquet = args.slugs_from if args.slugs_from else EXISTING_PARQUET
df_existing = pd.read_parquet(slugs_parquet)
slugs = df_existing["Slug"].dropna().unique().tolist()
# source_urls may not be present in custom parcets — fall back to constructing from slug
if "SourceURL" in df_existing.columns:
source_urls = dict(zip(df_existing["Slug"], df_existing["SourceURL"]))
else:
source_urls = {s: BASE_URL.format(slug=s) for s in slugs}
# Resume support
done_slugs: set[str] = set()
if args.resume and args.out.exists():
df_done = pd.read_parquet(args.out)
done_slugs = set(df_done["Slug"].dropna().tolist())
print(f"Resuming — {len(done_slugs)} slugs already scraped")
if args.limit:
slugs = slugs[: args.limit]
results: list[dict[str, Any]] = []
skipped = 0
failed = 0
_UA = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
for i, slug in enumerate(slugs):
if slug in done_slugs:
skipped += 1
continue
url = BASE_URL.format(slug=slug)
print(f"[{i+1}/{len(slugs)}] {slug}", end="", flush=True)
# Use a fresh browser context per slug to avoid Cloudflare session-level
# bot detection, which fires on the 2nd+ request in the same context.
context = browser.new_context(
user_agent=_UA,
viewport={"width": 1280, "height": 900},
)
page = context.new_page()
try:
page.goto(url, timeout=NAV_TIMEOUT_MS, wait_until="domcontentloaded")
page.wait_for_timeout(RENDER_WAIT_MS)
recipe = _parse_recipe(page, slug, source_urls.get(slug, url))
except PWTimeout:
print("TIMEOUT")
failed += 1
except Exception as exc:
print(f"ERROR: {exc}")
failed += 1
else:
if recipe is None:
print("no content (404 or redirect)")
failed += 1
elif recipe["HasFullRecipe"]:
n = len(recipe["RecipeIngredientParts"])
s = len(recipe["RecipeInstructions"])
print(f"OK ({n} ingredients, {s} steps)")
results.append(recipe)
else:
print(f"partial (ings={len(recipe['RecipeIngredientParts'])}, steps={len(recipe['RecipeInstructions'])})")
results.append(recipe)
finally:
context.close()
time.sleep(args.delay)
browser.close()
print(f"\nDone — {len(results)} scraped, {skipped} skipped, {failed} failed")
if results:
df_out = pd.DataFrame(results)
# Merge with existing metadata (nutrition stubs, wayback fields) for slugs
# that didn't previously have full data
args.out.parent.mkdir(parents=True, exist_ok=True)
if args.resume and args.out.exists():
df_prev = pd.read_parquet(args.out)
df_out = pd.concat([df_prev, df_out], ignore_index=True)
df_out = df_out.drop_duplicates(subset=["Slug"], keep="last")
df_out.to_parquet(args.out, index=False)
full_count = df_out["HasFullRecipe"].sum() if "HasFullRecipe" in df_out.columns else "?"
print(f"Saved {len(df_out)} rows to {args.out} ({full_count} with full recipes)")
else:
print("No results — output not written")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,538 @@
"""
scrape_recipes.py fetch full recipe data for slugs in pc_slugs.jsonl.
For each slug:
1. Try Wayback /api/v1/products/<slug> oldest capture first (pre-HelloFresh
acquisition data is more complete).
2. If instructions are empty, try the recipe HTML page via Wayback and parse
inline JSON state or structured markup.
3. Merge with metadata already in the manifest (title, tags, cook_time, etc.)
4. Emit one row per recipe to recipes_purplecarrot.parquet in food.com columnar
format so build_recipe_index.py can import it unchanged.
Output columns (food.com schema + PC extras ignored by the indexer):
RecipeId, Name, Subtitle, RecipeIngredientParts, RecipeInstructions,
RecipeCategory, Keywords, Calories, FatContent, ProteinContent,
SodiumContent, SugarContent, CarbohydrateContent, FiberContent,
RecipeServings, Description, ImageURL, CookTime, Slug, Source
Usage:
conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes
conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes \\
--slugs /Library/Assets/kiwi/pipeline/pc_slugs.jsonl \\
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet \\
--resume
"""
from __future__ import annotations
import argparse
import json
import logging
import re
import time
from pathlib import Path
from typing import Any
import requests
logger = logging.getLogger(__name__)
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
WB_BASE = "https://web.archive.org/web"
PC_HOST = "www.purplecarrot.com"
REPLAY_DELAY = 2.0
CDX_DELAY = 3.0 # archive.org CDX rate-limits aggressively; be polite
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
# Inline JSON state embedded by the SSR renderer — used as fallback HTML parser
_NEXT_DATA_RE = re.compile(r'<script id="__NEXT_DATA__"[^>]*>(.*?)</script>', re.DOTALL)
_REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n', re.DOTALL)
# ── Wayback helpers ───────────────────────────────────────────────────────────
def _cdx_get(params: dict) -> list:
"""CDX request with retry on 429/503 (archive.org rate-limits aggressively)."""
for attempt in range(4):
try:
resp = requests.get(CDX_BASE, params=params, timeout=25)
if resp.status_code in (429, 503):
wait = 15 * (2 ** attempt)
logger.debug("CDX %s — backing off %ds", resp.status_code, wait)
time.sleep(wait)
continue
resp.raise_for_status()
rows = resp.json()
return rows if rows else []
except Exception as exc:
logger.debug("CDX attempt %d failed: %s", attempt + 1, exc)
time.sleep(5 * (attempt + 1))
return []
def _cdx_timestamps(slug: str) -> list[str]:
"""Return captured timestamps for a product slug, oldest first (pre-2022 window)."""
rows = _cdx_get({
"url": f"{PC_HOST}/api/v1/products/{slug}",
"output": "json",
"fl": "timestamp,statuscode",
"filter": "statuscode:200",
"limit": "20",
# Pre-HelloFresh-acquisition captures (2019-2021) are most likely
# to have full instructions — API stripped them post-acquisition.
"from": "20190101",
"to": "20211231",
})
if len(rows) < 2:
return []
return [row[0] for row in rows[1:]] # timestamps only, oldest first
def _wayback_json(url: str, timestamp: str) -> Any | None:
replay = f"{WB_BASE}/{timestamp}/{url}"
for attempt in range(3):
try:
resp = requests.get(replay, timeout=30)
if resp.status_code == 200:
return resp.json()
if resp.status_code in (404, 410):
return None
except Exception as exc:
logger.debug("Wayback JSON attempt %d failed (%s): %s", attempt + 1, url, exc)
time.sleep(2 ** attempt)
return None
def _wayback_html(url: str, timestamp: str) -> str | None:
replay = f"{WB_BASE}/{timestamp}/{url}"
for attempt in range(3):
try:
resp = requests.get(replay, timeout=30)
if resp.status_code == 200:
return resp.text
if resp.status_code in (404, 410):
return None
except Exception as exc:
logger.debug("Wayback HTML attempt %d failed (%s): %s", attempt + 1, url, exc)
time.sleep(2 ** attempt)
return None
# ── Recipe extraction from API JSON ──────────────────────────────────────────
def _extract_from_api(data: dict) -> dict | None:
"""Parse a /api/v1/products/<slug> response into our recipe dict.
Returns None if the response has no usable content (empty title, etc.).
Returns a partial dict if only some fields are populated caller merges
with manifest metadata.
"""
if not data or not isinstance(data, dict):
return None
title = data.get("title", "").strip()
subtitle = data.get("subtitle", "").strip()
slug = data.get("slug", "")
skus = data.get("skus") or []
sku = skus[0] if skus else {}
# Instructions: list of {step_number, title, description}
raw_instructions = sku.get("instructions") or []
steps: list[str] = []
for step in sorted(raw_instructions, key=lambda s: s.get("step_number", 0)):
parts = []
if step.get("title"):
parts.append(step["title"])
if step.get("description"):
parts.append(step["description"])
if parts:
steps.append(". ".join(parts))
# Ingredients: may be in ingredients_quantity or ingredients
raw_ingr = sku.get("ingredients_quantity") or sku.get("ingredients") or []
ingredients: list[str] = []
for item in raw_ingr:
if isinstance(item, dict):
qty = item.get("quantity") or item.get("qty") or ""
unit = item.get("unit") or ""
name = item.get("name") or item.get("ingredient", {}).get("name", "") if isinstance(item.get("ingredient"), dict) else item.get("ingredient_name", "")
raw = item.get("raw") or item.get("display_name") or ""
line = raw or " ".join(filter(None, [str(qty), str(unit), str(name)])).strip()
if line:
ingredients.append(line)
elif isinstance(item, str) and item.strip():
ingredients.append(item.strip())
nutrition = sku.get("nutrition_label") or {}
calories = _num(nutrition.get("calories") or sku.get("calories"))
fat = _num(nutrition.get("total_fat") or sku.get("fat"))
protein = _num(nutrition.get("protein") or sku.get("protein"))
sodium = _num(nutrition.get("sodium") or sku.get("sodium"))
sugar = _num(nutrition.get("sugar") or nutrition.get("total_sugars"))
carbs = _num(nutrition.get("total_carbohydrate") or sku.get("carbs"))
fiber = _num(nutrition.get("dietary_fiber") or sku.get("fiber"))
tags = sku.get("tags") or data.get("tags") or []
category = sku.get("meal_type") or sku.get("product_type") or ""
servings = _num(sku.get("servings"))
cook_time = sku.get("prep_and_cook_time") or ""
description = sku.get("description") or ""
images = sku.get("hero_images") or sku.get("image_versions") or []
# hero_images can be a list OR a dict keyed by size string — normalise to list
if isinstance(images, dict):
images = list(images.values())
image_url = ""
if images and isinstance(images[0], dict):
image_url = images[0].get("image_url") or images[0].get("url") or ""
if not image_url and data.get("square_image"):
sq = data["square_image"]
image_url = sq.get("url") if isinstance(sq, dict) else ""
return {
"slug": slug,
"title": title,
"subtitle": subtitle,
"steps": steps,
"ingredients": ingredients,
"category": category,
"tags": tags,
"calories": calories,
"fat": fat,
"protein": protein,
"sodium": sodium,
"sugar": sugar,
"carbs": carbs,
"fiber": fiber,
"servings": servings,
"cook_time": cook_time,
"description": description,
"image_url": image_url,
"has_full_recipe": bool(steps and ingredients),
}
def _num(val: Any) -> float | None:
if val is None:
return None
try:
v = float(str(val).replace("g", "").replace("mg", "").split()[0])
return v if v > 0 else None
except Exception:
return None
# ── Fallback: HTML inline state parsing ──────────────────────────────────────
def _extract_from_html(html: str, slug: str) -> dict | None:
"""Try to pull recipe data from inline JS state in older SSR pages."""
# Attempt 1: Next.js __NEXT_DATA__
m = _NEXT_DATA_RE.search(html)
if m:
try:
state = json.loads(m.group(1))
# Walk the Next.js page props tree looking for recipe data
props = state.get("props", {}).get("pageProps", {})
recipe = props.get("recipe") or props.get("product")
if recipe and isinstance(recipe, dict) and recipe.get("title"):
return _extract_from_api(recipe)
except Exception:
pass
# Attempt 2: Redux __INITIAL_STATE__
m = _REDUX_STATE_RE.search(html)
if m:
try:
state = json.loads(m.group(1))
# Try common Redux state shapes
for key in ("recipe", "product", "currentRecipe", "currentProduct"):
recipe = state.get(key)
if recipe and isinstance(recipe, dict) and recipe.get("title"):
return _extract_from_api(recipe)
except Exception:
pass
# Attempt 3: JSON-LD structured data
ld_matches = re.findall(
r'<script[^>]+type=["\']application/ld\+json["\'][^>]*>(.*?)</script>',
html, re.DOTALL
)
for raw in ld_matches:
try:
ld = json.loads(raw)
if isinstance(ld, list):
ld = next((x for x in ld if x.get("@type") == "Recipe"), None)
if not ld or ld.get("@type") != "Recipe":
continue
steps = []
for inst in (ld.get("recipeInstructions") or []):
if isinstance(inst, dict):
steps.append(inst.get("text", ""))
elif isinstance(inst, str):
steps.append(inst)
ingredients = ld.get("recipeIngredient") or []
return {
"slug": slug,
"title": ld.get("name", ""),
"subtitle": "",
"steps": [s for s in steps if s],
"ingredients": [i for i in ingredients if i],
"category": ld.get("recipeCategory", ""),
"tags": ld.get("keywords", "").split(",") if isinstance(ld.get("keywords"), str) else [],
"calories": _num((ld.get("nutrition") or {}).get("calories")),
"fat": None, "protein": None, "sodium": None,
"sugar": None, "carbs": None, "fiber": None,
"servings": _num(ld.get("recipeYield")),
"cook_time": str(ld.get("totalTime") or ld.get("cookTime") or ""),
"description": ld.get("description", ""),
"image_url": (ld["image"][0] if isinstance(ld.get("image"), list) else ld.get("image", "")) or "",
"has_full_recipe": True,
}
except Exception:
pass
return None
# ── Per-slug fetch ─────────────────────────────────────────────────────────────
def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None:
"""Fetch the fullest available recipe data for a slug from Wayback.
Returns a merged dict of manifest metadata + API/HTML-extracted content.
"""
api_url = f"https://{PC_HOST}/api/v1/products/{slug}"
html_url = f"https://{PC_HOST}/recipe/{slug}"
recipe: dict | None = None
# Try product API — oldest captures are most likely to have full data
timestamps = _cdx_timestamps(slug)
time.sleep(CDX_DELAY)
if not timestamps and manifest_meta.get("wayback_ts"):
timestamps = [manifest_meta["wayback_ts"]]
for ts in timestamps:
data = _wayback_json(api_url, ts)
time.sleep(REPLAY_DELAY)
if not data:
continue
candidate = _extract_from_api(data)
if not candidate:
continue
recipe = candidate
if recipe.get("has_full_recipe"):
logger.debug("[%s] Full recipe from API (ts=%s)", slug, ts)
break
logger.debug("[%s] Partial API data (ts=%s) — trying HTML fallback", slug, ts)
# HTML fallback when API has no steps/ingredients
if not recipe or not recipe.get("has_full_recipe"):
html_ts_rows = _cdx_get({
"url": f"{PC_HOST}/recipe/{slug}",
"output": "json",
"fl": "timestamp,statuscode",
"filter": "statuscode:200",
"limit": "10",
})
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
time.sleep(CDX_DELAY)
for ts in html_timestamps:
html = _wayback_html(html_url, ts)
time.sleep(REPLAY_DELAY)
if not html:
continue
html_recipe = _extract_from_html(html, slug)
if html_recipe and html_recipe.get("has_full_recipe"):
logger.debug("[%s] Full recipe from HTML (ts=%s)", slug, ts)
recipe = html_recipe
break
# Build merged record: manifest metadata fills any gaps from API/HTML
merged: dict = {
"slug": slug,
"title": manifest_meta.get("title", ""),
"subtitle": manifest_meta.get("subtitle", ""),
"steps": [],
"ingredients": [],
"category": "",
"tags": manifest_meta.get("tags") or [],
"calories": None,
"fat": None,
"protein": None,
"sodium": None,
"sugar": None,
"carbs": None,
"fiber": None,
"servings": manifest_meta.get("serving_size"),
"cook_time": manifest_meta.get("cook_time", ""),
"description": manifest_meta.get("description", ""),
"image_url": manifest_meta.get("image_url", ""),
"source": "purple_carrot",
"wayback_ts": manifest_meta.get("wayback_ts", ""),
"has_full_recipe": False,
}
if recipe:
for key in recipe:
# Prefer API/HTML data; keep manifest value only when API field is empty
val = recipe[key]
if val or key not in merged or not merged[key]:
merged[key] = val
if not merged["title"]:
logger.warning("[%s] No title — skipping", slug)
return None
return merged
# ── Output formatting ─────────────────────────────────────────────────────────
def _to_dataframe_row(r: dict) -> dict:
"""Convert merged recipe dict to food.com-compatible parquet row."""
# Build plain-text input for allrecipes-style corpus compatibility
lines = [r["title"]]
if r.get("subtitle"):
lines.append(r["subtitle"])
if r.get("description"):
lines.append("")
lines.append(r["description"])
if r.get("ingredients"):
lines += ["", "Ingredients:"] + [f"- {i}" for i in r["ingredients"]]
if r.get("steps"):
lines += ["", "Directions:"] + [f"- {s}" for s in r["steps"]]
plain_text = "\n".join(lines)
source_url = f"https://www.purplecarrot.com/recipe/{r['slug']}"
return {
# food.com schema columns (used by build_recipe_index.py)
"RecipeId": f"pc_{r['slug']}",
"Name": r["title"],
"RecipeIngredientParts": r.get("ingredients") or [],
"RecipeInstructions": r.get("steps") or [],
"RecipeCategory": r.get("category", ""),
"Keywords": r.get("tags") or [],
"Calories": r.get("calories"),
"FatContent": r.get("fat"),
"ProteinContent": r.get("protein"),
"SodiumContent": r.get("sodium"),
"SugarContent": r.get("sugar"),
"CarbohydrateContent": r.get("carbs"),
"FiberContent": r.get("fiber"),
"RecipeServings": r.get("servings"),
# PC-specific extras (ignored by indexer, used by training pipeline)
"Subtitle": r.get("subtitle", ""),
"Description": r.get("description", ""),
"ImageURL": r.get("image_url", ""),
"CookTime": r.get("cook_time", ""),
"Slug": r["slug"],
"Source": "purple_carrot",
"SourceURL": source_url, # canonical attribution link shown in recipe UI
"HasFullRecipe": r.get("has_full_recipe", False),
"WaybackTs": r.get("wayback_ts", ""),
# Also emit plain-text input for allrecipes-compatible corpus search
"input": plain_text,
}
# ── Main ──────────────────────────────────────────────────────────────────────
def scrape(slugs_file: Path, out_file: Path, resume: bool = True) -> None:
import pandas as pd
# Load manifest
if not slugs_file.exists():
logger.error("Slugs manifest not found: %s", slugs_file)
return
manifest: dict[str, dict] = {}
with open(slugs_file) as f:
for line in f:
line = line.strip()
if line:
rec = json.loads(line)
slug = rec["slug"]
# Keep the richest metadata if slug appears from multiple sources
if slug not in manifest or rec.get("source") == "menu":
manifest[slug] = rec
logger.info("Manifest: %d unique slugs", len(manifest))
# Load already-scraped slugs for resume
done_slugs: set[str] = set()
existing_rows: list[dict] = []
if resume and out_file.exists():
try:
existing_df = pd.read_parquet(out_file)
done_slugs = set(existing_df["Slug"].tolist())
existing_rows = existing_df.to_dict("records")
logger.info("Resume: %d already scraped", len(done_slugs))
except Exception as exc:
logger.warning("Could not load existing parquet for resume: %s", exc)
todo = [s for s in manifest if s not in done_slugs]
logger.info("%d slugs to fetch", len(todo))
rows = list(existing_rows)
for i, slug in enumerate(todo, 1):
logger.info("[%d/%d] %s", i, len(todo), slug)
recipe = fetch_recipe(slug, manifest[slug])
if recipe:
rows.append(_to_dataframe_row(recipe))
status = "full" if recipe.get("has_full_recipe") else "partial"
logger.info(" -> %s (%s)", recipe.get("title", "?"), status)
else:
logger.warning(" -> skipped (no title)")
# Write checkpoint every 50 recipes
if i % 50 == 0:
_write_parquet(rows, out_file)
logger.info("Checkpoint: %d recipes written", len(rows))
_write_parquet(rows, out_file)
full = sum(1 for r in rows if r.get("HasFullRecipe"))
logger.info(
"Done. %d recipes written to %s (%d full, %d partial).",
len(rows), out_file, full, len(rows) - full,
)
def _write_parquet(rows: list[dict], out_file: Path) -> None:
import pandas as pd
out_file.parent.mkdir(parents=True, exist_ok=True)
pd.DataFrame(rows).to_parquet(out_file, index=False)
def main() -> None:
parser = argparse.ArgumentParser(description="Scrape Purple Carrot recipes from Wayback")
parser.add_argument("--slugs", type=Path, default=DEFAULT_SLUGS)
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
parser.add_argument(
"--no-resume", dest="resume", action="store_false",
help="Start fresh (ignore existing parquet)",
)
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
from scripts.pipeline.log_utils import attach_pipeline_log
attach_pipeline_log("scrape_recipes")
scrape(args.slugs, args.out, resume=args.resume)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,41 @@
#!/usr/bin/env bash
# Weekly Purple Carrot recipe harvest
# Runs every Sunday night via cron.
# Discovers this week's menu and scrapes full recipe data.
# Logs to /Library/Assets/kiwi/pipeline/logs/purple_carrot_harvest.log
set -euo pipefail
REPO="/Library/Development/CircuitForge/kiwi"
MENU_OUT="/Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet"
LIVE_OUT="/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet"
LOG_DIR="/Library/Assets/kiwi/pipeline/logs"
LOG="$LOG_DIR/purple_carrot_harvest.log"
mkdir -p "$LOG_DIR"
echo "=== Purple Carrot harvest $(date -u '+%Y-%m-%d %H:%M UTC') ===" >> "$LOG"
cd "$REPO"
# Step 1: discover this week's menu slugs
echo "[1/2] Discovering current menu slugs..." | tee -a "$LOG"
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_current_menu.py \
--out "$MENU_OUT" 2>&1 | tee -a "$LOG"
# Step 2: scrape full recipe data for new slugs only (--resume skips already-scraped)
echo "[2/2] Scraping live recipe pages..." | tee -a "$LOG"
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
--slugs-from "$MENU_OUT" \
--out "$LIVE_OUT" \
--resume \
--delay 3.0 2>&1 | tee -a "$LOG"
# Step 3: ingest new recipes into the shared corpus DB
echo "[3/3] Ingesting into corpus DB..." | tee -a "$LOG"
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
--parquet "$LIVE_OUT" \
--db /Library/Assets/kiwi/kiwi.db 2>&1 | tee -a "$LOG"
echo "=== Done $(date -u '+%Y-%m-%d %H:%M UTC') ===" >> "$LOG"
echo "" >> "$LOG"