Compare commits

...

6 commits

Author SHA1 Message Date
51a48a430b feat(config): add GPU_SERVER_URL alias for CF_ORCH_URL
Some checks failed
CI / Backend (Python) (push) Waiting to run
CI / Frontend (Vue) (push) Waiting to run
Mirror / mirror (push) Has been cancelled
Release / release (push) Has been cancelled
Self-hoster-friendly env var name. Priority: GPU_SERVER_URL →
CF_ORCH_URL (compat) → https://orch.circuitforge.tech when
CF_LICENSE_KEY is present (Paid+ auto-default). Resolved value
written back to os.environ["CF_ORCH_URL"] at startup so all
service callers remain unchanged.

Bump version to 0.10.0.
2026-05-17 09:42:48 -07:00
b326d4aa6e fix(config): add CF_ORCH_URL to local env for recipe scan + LLM features
Without CF_ORCH_URL set, _call_vision_backend() skips cf-orch entirely
and falls through to local VLM (no GPU in container) then fails.
.env gets CF_ORCH_URL=http://10.1.10.71:7700 for the local rack.
.env.example updated with documentation for self-hosters.

Local scan confirmed: cf-docuvision (Sif, GGUF) → ollama llama3.1:8b → 200 OK.
2026-05-17 09:21:33 -07:00
7cad503b35 feat(pipeline): Purple Carrot recipe corpus scraper via Wayback Machine
discover_wayback.py — enumerates recipe slugs from archived menu API
  (/api/v2/menus/<id>) and product API (/api/v1/products/*) plus
  recipe-category HTML pages. Writes incremental JSONL manifest to
  /Library/Assets/kiwi/pipeline/pc_slugs.jsonl.

scrape_recipes.py — fetches full recipe data per slug using three-tier
  fallback: product API JSON (oldest captures first), HTML inline state
  (__NEXT_DATA__ / __INITIAL_STATE__), and JSON-LD structured data.
  Outputs recipes_purplecarrot.parquet in food.com columnar format so
  build_recipe_index.py imports it unchanged. Includes SourceURL column
  for recipe attribution UI (kiwi#139). Checkpoints every 50 recipes.

Initial discovery: 158 slugs from menu 1536 + product_api pass.
Re-run discover_wayback.py after archive.org stabilizes to pick up
older slugs from recipe-category pages.

Backlog: live Playwright scraper for post-Wayback recipes (kiwi#137).
2026-05-17 09:16:35 -07:00
430600c1af fix(recipe_scan): harden JSON parser for real-world LLM output quirks
- Strip <think>/<thinking> blocks before parsing (Qwen3/DeepSeek-R1 emit
  these before the actual JSON answer)
- Replace greedy regex with brace-balanced _extract_json_object() so
  trailing prose after } doesn't corrupt the extract
- Use non-greedy fence regex to pull JSON from inside ```json blocks
- Pass system= to LLMRouter.complete() with a terse JSON-only instruction
  so Ollama models receive it as a system message, not buried in the user turn
- Add logger.warning() on parse failure so raw output is diagnosable
2026-05-17 08:30:55 -07:00
21a9b85067 fix(recipe_scan): revert to cf-docuvision path (GGUF backend now works)
Route recipe_scan back through task_allocate -> cf-docuvision -> DocuvisionClient
now that docuvision supports GGUF models via Qwen25VLChatHandler.

Two-step pipeline: docuvision OCRs image(s), LLMRouter structures OCR text to JSON.
Removes the non-functional cf-text image_url path (cf-text rejects content arrays).
2026-05-16 19:25:01 -07:00
c72b4415db feat(recipe_scan): use Qwen2-VL GGUF via cf-text OpenAI-compat API
Replace two-step docuvision OCR + LLM structuring pipeline with a
single multimodal VLM call. The bartowski Qwen2-VL-7B-Instruct Q5_K_M
GGUF is served by cf-text (llama.cpp) and accepts image_url content
blocks identical to the OpenAI vision API format.

Removes docuvision dependency for recipe scanning; the addict-missing /
DeepseekVLV2Processor-missing cf-docuvision error no longer blocks scans.
Receipt OCR (kiwi.ocr task) still routes to cf-docuvision separately.
2026-05-16 18:38:21 -07:00
7 changed files with 948 additions and 30 deletions

View file

@ -21,10 +21,12 @@ DATA_DIR=./data
# IP this machine advertises to the coordinator (must be reachable from coordinator host)
# CF_ORCH_ADVERTISE_HOST=10.1.10.71
# CF-core hosted coordinator (managed cloud GPU inference — Paid+ tier)
# Set CF_ORCH_URL to use a hosted cf-orch coordinator instead of self-hosting.
# CF_LICENSE_KEY is read automatically by CFOrchClient for bearer auth.
# CF_ORCH_URL=https://orch.circuitforge.tech
# GPU inference server (cf-orch coordinator for recipe scan, LLM generation, etc.)
# GPU_SERVER_URL: set to your local cf-orch coordinator (self-hosted rack).
# CF_ORCH_URL is the backward-compat alias — both are honoured.
# Paid+ default: when CF_LICENSE_KEY is present and neither URL is set,
# the app automatically points to https://orch.circuitforge.tech.
# GPU_SERVER_URL=http://10.1.10.71:7700
# CF_LICENSE_KEY=CFG-KIWI-xxxx-xxxx-xxxx
# LLM backend — env-var auto-config (no llm.yaml needed for bare-metal users)
@ -57,6 +59,9 @@ CF_APP_NAME=kiwi
# Unset = auto-detect: true if CLOUD_MODE or circuitforge_orch is installed (paid+ local).
# Set false to force LocalScheduler even when cf-orch is present.
# USE_ORCH_SCHEDULER=false
# GPU_SERVER_URL: cf-orch coordinator endpoint. Required for recipe scan (cf-docuvision)
# and LLM features on a self-hosted rack. CF_ORCH_URL is the backward-compat alias.
# GPU_SERVER_URL=http://10.1.10.71:7700
# Cloud mode (set in compose.cloud.yml; also set here for reference)
# CLOUD_DATA_ROOT=/devl/kiwi-cloud-data

View file

@ -65,9 +65,24 @@ class Settings:
# Quality
MIN_QUALITY_SCORE: float = float(os.environ.get("MIN_QUALITY_SCORE", "50.0"))
# CF-core resource coordinator (VRAM lease management)
# CF-core resource coordinator (VRAM lease management — lease broker, not inference)
COORDINATOR_URL: str = os.environ.get("COORDINATOR_URL", "http://localhost:7700")
# GPU inference server URL
# Priority: GPU_SERVER_URL env var → CF_ORCH_URL env var (backward compat)
# → https://orch.circuitforge.tech when CF_LICENSE_KEY is present (Paid+)
# Resolved value is written back to os.environ["CF_ORCH_URL"] at startup so
# all service-layer callers that read CF_ORCH_URL directly see the right URL.
GPU_SERVER_URL: str | None = (
os.environ.get("GPU_SERVER_URL")
or os.environ.get("CF_ORCH_URL")
or (
"https://orch.circuitforge.tech"
if os.environ.get("CF_LICENSE_KEY")
else None
)
)
# Hosted cf-orch coordinator — bearer token for managed cloud GPU inference (Paid+)
# CFOrchClient reads CF_LICENSE_KEY automatically; exposed here for startup validation.
CF_LICENSE_KEY: str | None = os.environ.get("CF_LICENSE_KEY")
@ -108,3 +123,9 @@ class Settings:
settings = Settings()
# Normalise GPU_SERVER_URL into CF_ORCH_URL so every service-layer caller that
# reads os.environ.get("CF_ORCH_URL") sees the resolved value, including the
# Paid+ cloud default injected above.
if settings.GPU_SERVER_URL:
os.environ["CF_ORCH_URL"] = settings.GPU_SERVER_URL

View file

@ -215,6 +215,35 @@ def _build_ocr_extraction_prompt(ocr_text: str) -> str:
)
def _call_via_cf_text_vlm(alloc_url: str, image_paths: list[Path], prompt: str) -> str:
"""Call the cf-text OpenAI-compat API with images via the llama.cpp multimodal backend."""
import httpx
content: list[dict] = []
for i, path in enumerate(image_paths):
if i > 0:
content.append({"type": "text", "text": f"(Page {i + 1} of the same recipe:)"})
b64 = _load_image_b64(path)
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
})
content.append({"type": "text", "text": prompt})
resp = httpx.post(
f"{alloc_url.rstrip('/')}/v1/chat/completions",
json={
"model": "local",
"messages": [{"role": "user", "content": content}],
"max_tokens": 2048,
"temperature": 0.0,
},
timeout=180.0,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"].strip()
def _call_vision_backend(
image_paths: list[Path],
prompt: str,
@ -222,7 +251,7 @@ def _call_vision_backend(
) -> str:
"""Dispatch to the best available vision backend.
Priority: cf-orch docuvision (OCR + text LLM) -> local Qwen2.5-VL -> Anthropic API.
Priority: cf-orch (Qwen2-VL GGUF via cf-text) -> local Qwen2.5-VL -> Anthropic API.
Raises RuntimeError with a clear message when no backend is available.
Args:
@ -237,9 +266,8 @@ def _call_vision_backend(
errors: list[str] = []
# 1. Try cf-orch task allocation → cf-docuvision OCR, then text LLM structuring.
# Two-step: docuvision extracts text from the image(s), then LLMRouter
# converts the OCR text to structured recipe JSON using the extraction prompt.
# 1. Try cf-orch task allocation → cf-docuvision (Qwen2-VL GGUF via llama.cpp).
# Two-step: docuvision OCRs the image(s), then LLMRouter structures the text into JSON.
cf_orch_url = os.environ.get("CF_ORCH_URL")
if cf_orch_url:
try:
@ -250,7 +278,6 @@ def _call_vision_backend(
try:
_progress("allocating", "Starting vision service...")
with task_allocate("kiwi", "recipe_scan", service_hint="cf-docuvision", ttl_s=120.0) as alloc:
# Step 1: OCR each image via cf-docuvision
_progress("scanning", "Extracting recipe text from photo...")
doc_client = DocuvisionClient(alloc.url)
ocr_parts: list[str] = []
@ -263,9 +290,11 @@ def _call_vision_backend(
if not combined_ocr.strip():
raise ValueError("Docuvision returned no text — image may not be a recipe")
# Step 2: Text LLM structures OCR output into recipe JSON
_progress("structuring", "Parsing recipe structure...")
text = LLMRouter().complete(_build_ocr_extraction_prompt(combined_ocr))
text = LLMRouter().complete(
_build_ocr_extraction_prompt(combined_ocr),
system="You are a recipe data extractor. Return ONLY valid JSON. No markdown, no explanation, no code fences.",
)
if text:
return text
@ -303,40 +332,76 @@ def _normalize_ingredient_name(name: str) -> str:
return name.lower().strip()
def _extract_json_object(text: str) -> str | None:
"""Return the first balanced JSON object from text, or None if not found.
Uses brace-counting rather than a greedy regex so trailing prose and
nested objects are handled correctly.
"""
start = text.find("{")
if start == -1:
return None
depth = 0
in_string = False
escape_next = False
for i, ch in enumerate(text[start:], start):
if escape_next:
escape_next = False
continue
if ch == "\\" and in_string:
escape_next = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return text[start : i + 1]
return None
def _parse_scanner_json(raw_text: str) -> dict:
"""Extract and return the JSON dict from VLM output.
Handles:
- Pure JSON
- JSON wrapped in ```json ... ``` markdown fences
- JSON preceded by a line of prose ("Here is the recipe: {...}")
- JSON in ```json ... ``` markdown fences
- Qwen3-style <think>...</think> or <thinking>...</thinking> preambles
- JSON preceded or followed by prose
Raises ValueError on not_a_recipe or unparseable output.
"""
text = raw_text.strip()
# Strip markdown fences if present
if text.startswith("```"):
parts = text.split("```")
for part in parts:
part = part.strip()
if part.startswith("json"):
part = part[4:].strip()
if part.startswith("{"):
text = part
break
# Strip thinking-token blocks emitted by reasoning models (Qwen3, DeepSeek-R1, etc.)
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE).strip()
text = re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL | re.IGNORECASE).strip()
# Try direct parse first
# Strip markdown fences if present
if "```" in text:
# Find the content between the first ``` pair
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if fence_match:
text = fence_match.group(1).strip()
# Try direct parse
try:
data = json.loads(text)
except json.JSONDecodeError:
# Extract first JSON object embedded in prose
match = re.search(r"\{.*\}", text, re.DOTALL)
if not match:
# Fall back to brace-balanced extraction from anywhere in the output
candidate = _extract_json_object(text)
if not candidate:
logger.warning("Could not parse JSON from LLM output (first 400 chars): %r", text[:400])
raise ValueError(f"Could not parse JSON from VLM output: {text[:200]!r}")
try:
data = json.loads(match.group(0))
data = json.loads(candidate)
except json.JSONDecodeError as exc:
logger.warning("Brace-extracted JSON still invalid: %r", candidate[:400])
raise ValueError(f"Could not parse JSON from VLM output: {exc}") from exc
if isinstance(data, dict) and data.get("error") == "not_a_recipe":

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "kiwi"
version = "0.6.0"
version = "0.10.0"
description = "Pantry tracking + leftover recipe suggestions"
readme = "README.md"
requires-python = ">=3.11"

View file

@ -0,0 +1,298 @@
"""
discover_wayback.py enumerate Purple Carrot recipe slugs via the Wayback Machine.
Strategy:
1. CDX API all archived /api/v2/menus/* URLs (multiple timestamps)
2. Replay fetch each menu's menuItems, extract productPath slugs
3. CDX API all archived /api/v1/products/* URLs (direct slug capture)
4. CDX API /recipe-categories/* HTML pages for older slugs
5. Deduplicate and write manifest to OUT_FILE
Output (JSONL, one record per recipe):
{"slug": "...", "title": "...", "subtitle": "...", "cook_time": "...",
"tags": [...], "serving_size": 2, "image_url": "...",
"wayback_ts": "20260412150557", "source": "menu|product_api|category_page"}
Usage:
conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback
conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback --out /Library/Assets/kiwi/pipeline/pc_slugs.jsonl
"""
from __future__ import annotations
import argparse
import json
import logging
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlencode
import requests
logger = logging.getLogger(__name__)
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
WB_BASE = "https://web.archive.org/web"
PC_HOST = "www.purplecarrot.com"
# Polite delay between Wayback replay fetches (seconds)
REPLAY_DELAY = 1.0
CDX_DELAY = 0.5
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
# ── CDX helpers ───────────────────────────────────────────────────────────────
def cdx_query(url_pattern: str, **kwargs) -> list[dict]:
"""Run a CDX search and return a list of result dicts."""
params = {
"url": url_pattern,
"output": "json",
"fl": "original,timestamp,statuscode",
"collapse": "urlkey",
"filter": "statuscode:200",
**kwargs,
}
for attempt in range(3):
try:
resp = requests.get(CDX_BASE, params=params, timeout=30)
resp.raise_for_status()
rows = resp.json()
if not rows or len(rows) < 2:
return []
headers = rows[0]
return [dict(zip(headers, row)) for row in rows[1:]]
except Exception as exc:
logger.warning("CDX attempt %d failed: %s", attempt + 1, exc)
time.sleep(2 ** attempt)
return []
def wayback_get(url: str, timestamp: str) -> Any | None:
"""Fetch a Wayback replay of a URL and return parsed JSON (or None)."""
replay_url = f"{WB_BASE}/{timestamp}/{url}"
for attempt in range(3):
try:
resp = requests.get(replay_url, timeout=30)
if resp.status_code == 200:
return resp.json()
if resp.status_code == 404:
return None
except Exception as exc:
logger.warning("Wayback GET attempt %d failed for %s: %s", attempt + 1, url, exc)
time.sleep(2 ** attempt)
return None
# ── Slug extraction ───────────────────────────────────────────────────────────
def slug_from_product_path(path: str) -> str | None:
"""'/recipe/foo-bar-baz''foo-bar-baz'."""
if not path:
return None
return path.strip("/").split("/")[-1] or None
def _menu_item_to_record(item: dict, wayback_ts: str) -> dict | None:
slug = slug_from_product_path(item.get("productPath", ""))
if not slug:
return None
return {
"slug": slug,
"title": item.get("title", ""),
"subtitle": item.get("subtitle", ""),
"cook_time": item.get("cookTime", ""),
"tags": item.get("filterTags") or [],
"serving_size": item.get("servingSize"),
"image_url": item.get("imageURL", ""),
"description": item.get("description", ""),
"wayback_ts": wayback_ts,
"source": "menu",
}
# ── Discovery passes ──────────────────────────────────────────────────────────
def pass_menus(seen_slugs: set[str]) -> list[dict]:
"""Walk all archived /api/v2/menus/* captures to extract slugs."""
records: list[dict] = []
# Find all distinct archived menu URLs
menu_cdx = cdx_query(f"{PC_HOST}/api/v2/menus/*", limit="500")
logger.info("CDX: %d archived menu URLs found", len(menu_cdx))
time.sleep(CDX_DELAY)
processed_menu_ids: set[str] = set()
for entry in menu_cdx:
url = entry["original"]
ts = entry["timestamp"]
# Skip the listing endpoint, only process individual menus
if not url.split("?")[0].rstrip("/").split("/")[-1].isdigit():
continue
menu_id = url.split("?")[0].rstrip("/").split("/")[-1]
if menu_id in processed_menu_ids:
continue
processed_menu_ids.add(menu_id)
logger.info("Fetching menu %s (ts=%s) ...", menu_id, ts)
data = wayback_get(url.split("?")[0] + "?logged_out=true", ts)
time.sleep(REPLAY_DELAY)
if not data or "menuItems" not in data:
continue
for item in data["menuItems"]:
rec = _menu_item_to_record(item, ts)
if rec and rec["slug"] not in seen_slugs:
seen_slugs.add(rec["slug"])
records.append(rec)
logger.debug(" + %s", rec["slug"])
logger.info(" %d new slugs (total so far: %d)", len(records), len(seen_slugs))
return records
def pass_product_api(seen_slugs: set[str]) -> list[dict]:
"""Pick up any directly archived /api/v1/products/* URLs the menu pass missed."""
records: list[dict] = []
product_cdx = cdx_query(f"{PC_HOST}/api/v1/products/*", limit="5000")
logger.info("CDX: %d archived product API URLs found", len(product_cdx))
time.sleep(CDX_DELAY)
for entry in product_cdx:
slug = entry["original"].rstrip("/").split("/")[-1]
if not slug or slug in seen_slugs:
continue
seen_slugs.add(slug)
records.append({
"slug": slug,
"title": "",
"subtitle": "",
"cook_time": "",
"tags": [],
"serving_size": None,
"image_url": "",
"description": "",
"wayback_ts": entry["timestamp"],
"source": "product_api",
})
logger.info("product_api pass: %d new slugs", len(records))
return records
def pass_category_pages(seen_slugs: set[str]) -> list[dict]:
"""Parse archived recipe-categories HTML pages for slugs not in the API.
Category pages are rendered SSR/with inline JSON state on older captures,
so we do a simple regex scan for /recipe/<slug> patterns.
"""
import re
records: list[dict] = []
SLUG_RE = re.compile(r'["\s]/recipe/([a-z0-9][a-z0-9\-]{3,})["\s/?]')
cat_cdx = cdx_query(f"{PC_HOST}/recipe-categories/*", limit="200")
logger.info("CDX: %d archived category pages found", len(cat_cdx))
time.sleep(CDX_DELAY)
seen_category_urls: set[str] = set()
for entry in cat_cdx:
url = entry["original"].split("?")[0]
if url in seen_category_urls:
continue
seen_category_urls.add(url)
replay_url = f"{WB_BASE}/{entry['timestamp']}/{url}"
try:
resp = requests.get(replay_url, timeout=30)
time.sleep(REPLAY_DELAY)
if resp.status_code != 200:
continue
except Exception as exc:
logger.warning("Category page fetch failed: %s", exc)
continue
for slug in SLUG_RE.findall(resp.text):
if slug in seen_slugs:
continue
seen_slugs.add(slug)
records.append({
"slug": slug,
"title": "",
"subtitle": "",
"cook_time": "",
"tags": [],
"serving_size": None,
"image_url": "",
"description": "",
"wayback_ts": entry["timestamp"],
"source": "category_page",
})
logger.info("category_pages pass: %d new slugs", len(records))
return records
# ── Main ──────────────────────────────────────────────────────────────────────
def discover(out_file: Path) -> None:
seen: set[str] = set()
# Load previously discovered slugs so reruns are incremental
existing: list[dict] = []
if out_file.exists():
with open(out_file) as f:
for line in f:
line = line.strip()
if line:
rec = json.loads(line)
seen.add(rec["slug"])
existing.append(rec)
logger.info("Loaded %d existing slugs from %s", len(seen), out_file)
new_records: list[dict] = []
new_records += pass_menus(seen)
new_records += pass_product_api(seen)
new_records += pass_category_pages(seen)
out_file.parent.mkdir(parents=True, exist_ok=True)
with open(out_file, "a") as f:
for rec in new_records:
f.write(json.dumps(rec) + "\n")
total = len(existing) + len(new_records)
logger.info(
"Done. %d new slugs written to %s (%d total).",
len(new_records), out_file, total,
)
def main() -> None:
parser = argparse.ArgumentParser(description="Discover Purple Carrot recipe slugs via Wayback")
parser.add_argument(
"--out",
type=Path,
default=DEFAULT_OUT,
help=f"Output JSONL manifest (default: {DEFAULT_OUT})",
)
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
discover(args.out)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,529 @@
"""
scrape_recipes.py fetch full recipe data for slugs in pc_slugs.jsonl.
For each slug:
1. Try Wayback /api/v1/products/<slug> oldest capture first (pre-HelloFresh
acquisition data is more complete).
2. If instructions are empty, try the recipe HTML page via Wayback and parse
inline JSON state or structured markup.
3. Merge with metadata already in the manifest (title, tags, cook_time, etc.)
4. Emit one row per recipe to recipes_purplecarrot.parquet in food.com columnar
format so build_recipe_index.py can import it unchanged.
Output columns (food.com schema + PC extras ignored by the indexer):
RecipeId, Name, Subtitle, RecipeIngredientParts, RecipeInstructions,
RecipeCategory, Keywords, Calories, FatContent, ProteinContent,
SodiumContent, SugarContent, CarbohydrateContent, FiberContent,
RecipeServings, Description, ImageURL, CookTime, Slug, Source
Usage:
conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes
conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes \\
--slugs /Library/Assets/kiwi/pipeline/pc_slugs.jsonl \\
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet \\
--resume
"""
from __future__ import annotations
import argparse
import json
import logging
import re
import time
from pathlib import Path
from typing import Any
import requests
logger = logging.getLogger(__name__)
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
WB_BASE = "https://web.archive.org/web"
PC_HOST = "www.purplecarrot.com"
REPLAY_DELAY = 1.2
CDX_DELAY = 0.5
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
# Inline JSON state embedded by the SSR renderer — used as fallback HTML parser
_NEXT_DATA_RE = re.compile(r'<script id="__NEXT_DATA__"[^>]*>(.*?)</script>', re.DOTALL)
_REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n', re.DOTALL)
# ── Wayback helpers ───────────────────────────────────────────────────────────
def _cdx_timestamps(slug: str) -> list[str]:
"""Return all captured timestamps for a product slug, oldest first."""
url = f"{PC_HOST}/api/v1/products/{slug}"
try:
resp = requests.get(
CDX_BASE,
params={
"url": url,
"output": "json",
"fl": "timestamp,statuscode",
"filter": "statuscode:200",
"limit": "20",
},
timeout=20,
)
resp.raise_for_status()
rows = resp.json()
if len(rows) < 2:
return []
return [row[0] for row in rows[1:]] # timestamps only, oldest first
except Exception as exc:
logger.debug("CDX timestamps failed for %s: %s", slug, exc)
return []
def _wayback_json(url: str, timestamp: str) -> Any | None:
replay = f"{WB_BASE}/{timestamp}/{url}"
for attempt in range(3):
try:
resp = requests.get(replay, timeout=30)
if resp.status_code == 200:
return resp.json()
if resp.status_code in (404, 410):
return None
except Exception as exc:
logger.debug("Wayback JSON attempt %d failed (%s): %s", attempt + 1, url, exc)
time.sleep(2 ** attempt)
return None
def _wayback_html(url: str, timestamp: str) -> str | None:
replay = f"{WB_BASE}/{timestamp}/{url}"
for attempt in range(3):
try:
resp = requests.get(replay, timeout=30)
if resp.status_code == 200:
return resp.text
if resp.status_code in (404, 410):
return None
except Exception as exc:
logger.debug("Wayback HTML attempt %d failed (%s): %s", attempt + 1, url, exc)
time.sleep(2 ** attempt)
return None
# ── Recipe extraction from API JSON ──────────────────────────────────────────
def _extract_from_api(data: dict) -> dict | None:
"""Parse a /api/v1/products/<slug> response into our recipe dict.
Returns None if the response has no usable content (empty title, etc.).
Returns a partial dict if only some fields are populated caller merges
with manifest metadata.
"""
if not data or not isinstance(data, dict):
return None
title = data.get("title", "").strip()
subtitle = data.get("subtitle", "").strip()
slug = data.get("slug", "")
skus = data.get("skus") or []
sku = skus[0] if skus else {}
# Instructions: list of {step_number, title, description}
raw_instructions = sku.get("instructions") or []
steps: list[str] = []
for step in sorted(raw_instructions, key=lambda s: s.get("step_number", 0)):
parts = []
if step.get("title"):
parts.append(step["title"])
if step.get("description"):
parts.append(step["description"])
if parts:
steps.append(". ".join(parts))
# Ingredients: may be in ingredients_quantity or ingredients
raw_ingr = sku.get("ingredients_quantity") or sku.get("ingredients") or []
ingredients: list[str] = []
for item in raw_ingr:
if isinstance(item, dict):
qty = item.get("quantity") or item.get("qty") or ""
unit = item.get("unit") or ""
name = item.get("name") or item.get("ingredient", {}).get("name", "") if isinstance(item.get("ingredient"), dict) else item.get("ingredient_name", "")
raw = item.get("raw") or item.get("display_name") or ""
line = raw or " ".join(filter(None, [str(qty), str(unit), str(name)])).strip()
if line:
ingredients.append(line)
elif isinstance(item, str) and item.strip():
ingredients.append(item.strip())
nutrition = sku.get("nutrition_label") or {}
calories = _num(nutrition.get("calories") or sku.get("calories"))
fat = _num(nutrition.get("total_fat") or sku.get("fat"))
protein = _num(nutrition.get("protein") or sku.get("protein"))
sodium = _num(nutrition.get("sodium") or sku.get("sodium"))
sugar = _num(nutrition.get("sugar") or nutrition.get("total_sugars"))
carbs = _num(nutrition.get("total_carbohydrate") or sku.get("carbs"))
fiber = _num(nutrition.get("dietary_fiber") or sku.get("fiber"))
tags = sku.get("tags") or data.get("tags") or []
category = sku.get("meal_type") or sku.get("product_type") or ""
servings = _num(sku.get("servings"))
cook_time = sku.get("prep_and_cook_time") or ""
description = sku.get("description") or ""
images = sku.get("hero_images") or sku.get("image_versions") or []
image_url = ""
if images and isinstance(images[0], dict):
image_url = images[0].get("image_url") or images[0].get("url") or ""
if not image_url and data.get("square_image"):
sq = data["square_image"]
image_url = sq.get("url") if isinstance(sq, dict) else ""
return {
"slug": slug,
"title": title,
"subtitle": subtitle,
"steps": steps,
"ingredients": ingredients,
"category": category,
"tags": tags,
"calories": calories,
"fat": fat,
"protein": protein,
"sodium": sodium,
"sugar": sugar,
"carbs": carbs,
"fiber": fiber,
"servings": servings,
"cook_time": cook_time,
"description": description,
"image_url": image_url,
"has_full_recipe": bool(steps and ingredients),
}
def _num(val: Any) -> float | None:
if val is None:
return None
try:
v = float(str(val).replace("g", "").replace("mg", "").split()[0])
return v if v > 0 else None
except Exception:
return None
# ── Fallback: HTML inline state parsing ──────────────────────────────────────
def _extract_from_html(html: str, slug: str) -> dict | None:
"""Try to pull recipe data from inline JS state in older SSR pages."""
# Attempt 1: Next.js __NEXT_DATA__
m = _NEXT_DATA_RE.search(html)
if m:
try:
state = json.loads(m.group(1))
# Walk the Next.js page props tree looking for recipe data
props = state.get("props", {}).get("pageProps", {})
recipe = props.get("recipe") or props.get("product")
if recipe and isinstance(recipe, dict) and recipe.get("title"):
return _extract_from_api(recipe)
except Exception:
pass
# Attempt 2: Redux __INITIAL_STATE__
m = _REDUX_STATE_RE.search(html)
if m:
try:
state = json.loads(m.group(1))
# Try common Redux state shapes
for key in ("recipe", "product", "currentRecipe", "currentProduct"):
recipe = state.get(key)
if recipe and isinstance(recipe, dict) and recipe.get("title"):
return _extract_from_api(recipe)
except Exception:
pass
# Attempt 3: JSON-LD structured data
ld_matches = re.findall(
r'<script[^>]+type=["\']application/ld\+json["\'][^>]*>(.*?)</script>',
html, re.DOTALL
)
for raw in ld_matches:
try:
ld = json.loads(raw)
if isinstance(ld, list):
ld = next((x for x in ld if x.get("@type") == "Recipe"), None)
if not ld or ld.get("@type") != "Recipe":
continue
steps = []
for inst in (ld.get("recipeInstructions") or []):
if isinstance(inst, dict):
steps.append(inst.get("text", ""))
elif isinstance(inst, str):
steps.append(inst)
ingredients = ld.get("recipeIngredient") or []
return {
"slug": slug,
"title": ld.get("name", ""),
"subtitle": "",
"steps": [s for s in steps if s],
"ingredients": [i for i in ingredients if i],
"category": ld.get("recipeCategory", ""),
"tags": ld.get("keywords", "").split(",") if isinstance(ld.get("keywords"), str) else [],
"calories": _num((ld.get("nutrition") or {}).get("calories")),
"fat": None, "protein": None, "sodium": None,
"sugar": None, "carbs": None, "fiber": None,
"servings": _num(ld.get("recipeYield")),
"cook_time": str(ld.get("totalTime") or ld.get("cookTime") or ""),
"description": ld.get("description", ""),
"image_url": (ld["image"][0] if isinstance(ld.get("image"), list) else ld.get("image", "")) or "",
"has_full_recipe": True,
}
except Exception:
pass
return None
# ── Per-slug fetch ─────────────────────────────────────────────────────────────
def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None:
"""Fetch the fullest available recipe data for a slug from Wayback.
Returns a merged dict of manifest metadata + API/HTML-extracted content.
"""
api_url = f"https://{PC_HOST}/api/v1/products/{slug}"
html_url = f"https://{PC_HOST}/recipe/{slug}"
recipe: dict | None = None
# Try product API — oldest captures are most likely to have full data
timestamps = _cdx_timestamps(slug)
time.sleep(CDX_DELAY)
if not timestamps and manifest_meta.get("wayback_ts"):
timestamps = [manifest_meta["wayback_ts"]]
for ts in timestamps:
data = _wayback_json(api_url, ts)
time.sleep(REPLAY_DELAY)
if not data:
continue
candidate = _extract_from_api(data)
if not candidate:
continue
recipe = candidate
if recipe.get("has_full_recipe"):
logger.debug("[%s] Full recipe from API (ts=%s)", slug, ts)
break
logger.debug("[%s] Partial API data (ts=%s) — trying HTML fallback", slug, ts)
# HTML fallback when API has no steps/ingredients
if not recipe or not recipe.get("has_full_recipe"):
html_cdx_url = f"{PC_HOST}/recipe/{slug}"
try:
html_resp = requests.get(
CDX_BASE,
params={
"url": html_cdx_url,
"output": "json",
"fl": "timestamp,statuscode",
"filter": "statuscode:200",
"limit": "5",
},
timeout=20,
)
html_ts_rows = html_resp.json() if html_resp.ok else []
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
except Exception:
html_timestamps = []
time.sleep(CDX_DELAY)
for ts in html_timestamps:
html = _wayback_html(html_url, ts)
time.sleep(REPLAY_DELAY)
if not html:
continue
html_recipe = _extract_from_html(html, slug)
if html_recipe and html_recipe.get("has_full_recipe"):
logger.debug("[%s] Full recipe from HTML (ts=%s)", slug, ts)
recipe = html_recipe
break
# Build merged record: manifest metadata fills any gaps from API/HTML
merged: dict = {
"slug": slug,
"title": manifest_meta.get("title", ""),
"subtitle": manifest_meta.get("subtitle", ""),
"steps": [],
"ingredients": [],
"category": "",
"tags": manifest_meta.get("tags") or [],
"calories": None,
"fat": None,
"protein": None,
"sodium": None,
"sugar": None,
"carbs": None,
"fiber": None,
"servings": manifest_meta.get("serving_size"),
"cook_time": manifest_meta.get("cook_time", ""),
"description": manifest_meta.get("description", ""),
"image_url": manifest_meta.get("image_url", ""),
"source": "purple_carrot",
"wayback_ts": manifest_meta.get("wayback_ts", ""),
"has_full_recipe": False,
}
if recipe:
for key in recipe:
# Prefer API/HTML data; keep manifest value only when API field is empty
val = recipe[key]
if val or key not in merged or not merged[key]:
merged[key] = val
if not merged["title"]:
logger.warning("[%s] No title — skipping", slug)
return None
return merged
# ── Output formatting ─────────────────────────────────────────────────────────
def _to_dataframe_row(r: dict) -> dict:
"""Convert merged recipe dict to food.com-compatible parquet row."""
# Build plain-text input for allrecipes-style corpus compatibility
lines = [r["title"]]
if r.get("subtitle"):
lines.append(r["subtitle"])
if r.get("description"):
lines.append("")
lines.append(r["description"])
if r.get("ingredients"):
lines += ["", "Ingredients:"] + [f"- {i}" for i in r["ingredients"]]
if r.get("steps"):
lines += ["", "Directions:"] + [f"- {s}" for s in r["steps"]]
plain_text = "\n".join(lines)
source_url = f"https://www.purplecarrot.com/recipe/{r['slug']}"
return {
# food.com schema columns (used by build_recipe_index.py)
"RecipeId": f"pc_{r['slug']}",
"Name": r["title"],
"RecipeIngredientParts": r.get("ingredients") or [],
"RecipeInstructions": r.get("steps") or [],
"RecipeCategory": r.get("category", ""),
"Keywords": r.get("tags") or [],
"Calories": r.get("calories"),
"FatContent": r.get("fat"),
"ProteinContent": r.get("protein"),
"SodiumContent": r.get("sodium"),
"SugarContent": r.get("sugar"),
"CarbohydrateContent": r.get("carbs"),
"FiberContent": r.get("fiber"),
"RecipeServings": r.get("servings"),
# PC-specific extras (ignored by indexer, used by training pipeline)
"Subtitle": r.get("subtitle", ""),
"Description": r.get("description", ""),
"ImageURL": r.get("image_url", ""),
"CookTime": r.get("cook_time", ""),
"Slug": r["slug"],
"Source": "purple_carrot",
"SourceURL": source_url, # canonical attribution link shown in recipe UI
"HasFullRecipe": r.get("has_full_recipe", False),
"WaybackTs": r.get("wayback_ts", ""),
# Also emit plain-text input for allrecipes-compatible corpus search
"input": plain_text,
}
# ── Main ──────────────────────────────────────────────────────────────────────
def scrape(slugs_file: Path, out_file: Path, resume: bool = True) -> None:
import pandas as pd
# Load manifest
if not slugs_file.exists():
logger.error("Slugs manifest not found: %s", slugs_file)
return
manifest: dict[str, dict] = {}
with open(slugs_file) as f:
for line in f:
line = line.strip()
if line:
rec = json.loads(line)
slug = rec["slug"]
# Keep the richest metadata if slug appears from multiple sources
if slug not in manifest or rec.get("source") == "menu":
manifest[slug] = rec
logger.info("Manifest: %d unique slugs", len(manifest))
# Load already-scraped slugs for resume
done_slugs: set[str] = set()
existing_rows: list[dict] = []
if resume and out_file.exists():
try:
existing_df = pd.read_parquet(out_file)
done_slugs = set(existing_df["Slug"].tolist())
existing_rows = existing_df.to_dict("records")
logger.info("Resume: %d already scraped", len(done_slugs))
except Exception as exc:
logger.warning("Could not load existing parquet for resume: %s", exc)
todo = [s for s in manifest if s not in done_slugs]
logger.info("%d slugs to fetch", len(todo))
rows = list(existing_rows)
for i, slug in enumerate(todo, 1):
logger.info("[%d/%d] %s", i, len(todo), slug)
recipe = fetch_recipe(slug, manifest[slug])
if recipe:
rows.append(_to_dataframe_row(recipe))
status = "full" if recipe.get("has_full_recipe") else "partial"
logger.info(" -> %s (%s)", recipe.get("title", "?"), status)
else:
logger.warning(" -> skipped (no title)")
# Write checkpoint every 50 recipes
if i % 50 == 0:
_write_parquet(rows, out_file)
logger.info("Checkpoint: %d recipes written", len(rows))
_write_parquet(rows, out_file)
full = sum(1 for r in rows if r.get("HasFullRecipe"))
logger.info(
"Done. %d recipes written to %s (%d full, %d partial).",
len(rows), out_file, full, len(rows) - full,
)
def _write_parquet(rows: list[dict], out_file: Path) -> None:
import pandas as pd
out_file.parent.mkdir(parents=True, exist_ok=True)
pd.DataFrame(rows).to_parquet(out_file, index=False)
def main() -> None:
parser = argparse.ArgumentParser(description="Scrape Purple Carrot recipes from Wayback")
parser.add_argument("--slugs", type=Path, default=DEFAULT_SLUGS)
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
parser.add_argument(
"--no-resume", dest="resume", action="store_false",
help="Start fresh (ignore existing parquet)",
)
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
scrape(args.slugs, args.out, resume=args.resume)
if __name__ == "__main__":
main()