Compare commits
6 commits
2df17ec719
...
51a48a430b
| Author | SHA1 | Date | |
|---|---|---|---|
| 51a48a430b | |||
| b326d4aa6e | |||
| 7cad503b35 | |||
| 430600c1af | |||
| 21a9b85067 | |||
| c72b4415db |
7 changed files with 948 additions and 30 deletions
13
.env.example
13
.env.example
|
|
@ -21,10 +21,12 @@ DATA_DIR=./data
|
||||||
# IP this machine advertises to the coordinator (must be reachable from coordinator host)
|
# IP this machine advertises to the coordinator (must be reachable from coordinator host)
|
||||||
# CF_ORCH_ADVERTISE_HOST=10.1.10.71
|
# CF_ORCH_ADVERTISE_HOST=10.1.10.71
|
||||||
|
|
||||||
# CF-core hosted coordinator (managed cloud GPU inference — Paid+ tier)
|
# GPU inference server (cf-orch coordinator for recipe scan, LLM generation, etc.)
|
||||||
# Set CF_ORCH_URL to use a hosted cf-orch coordinator instead of self-hosting.
|
# GPU_SERVER_URL: set to your local cf-orch coordinator (self-hosted rack).
|
||||||
# CF_LICENSE_KEY is read automatically by CFOrchClient for bearer auth.
|
# CF_ORCH_URL is the backward-compat alias — both are honoured.
|
||||||
# CF_ORCH_URL=https://orch.circuitforge.tech
|
# Paid+ default: when CF_LICENSE_KEY is present and neither URL is set,
|
||||||
|
# the app automatically points to https://orch.circuitforge.tech.
|
||||||
|
# GPU_SERVER_URL=http://10.1.10.71:7700
|
||||||
# CF_LICENSE_KEY=CFG-KIWI-xxxx-xxxx-xxxx
|
# CF_LICENSE_KEY=CFG-KIWI-xxxx-xxxx-xxxx
|
||||||
|
|
||||||
# LLM backend — env-var auto-config (no llm.yaml needed for bare-metal users)
|
# LLM backend — env-var auto-config (no llm.yaml needed for bare-metal users)
|
||||||
|
|
@ -57,6 +59,9 @@ CF_APP_NAME=kiwi
|
||||||
# Unset = auto-detect: true if CLOUD_MODE or circuitforge_orch is installed (paid+ local).
|
# Unset = auto-detect: true if CLOUD_MODE or circuitforge_orch is installed (paid+ local).
|
||||||
# Set false to force LocalScheduler even when cf-orch is present.
|
# Set false to force LocalScheduler even when cf-orch is present.
|
||||||
# USE_ORCH_SCHEDULER=false
|
# USE_ORCH_SCHEDULER=false
|
||||||
|
# GPU_SERVER_URL: cf-orch coordinator endpoint. Required for recipe scan (cf-docuvision)
|
||||||
|
# and LLM features on a self-hosted rack. CF_ORCH_URL is the backward-compat alias.
|
||||||
|
# GPU_SERVER_URL=http://10.1.10.71:7700
|
||||||
|
|
||||||
# Cloud mode (set in compose.cloud.yml; also set here for reference)
|
# Cloud mode (set in compose.cloud.yml; also set here for reference)
|
||||||
# CLOUD_DATA_ROOT=/devl/kiwi-cloud-data
|
# CLOUD_DATA_ROOT=/devl/kiwi-cloud-data
|
||||||
|
|
|
||||||
|
|
@ -65,9 +65,24 @@ class Settings:
|
||||||
# Quality
|
# Quality
|
||||||
MIN_QUALITY_SCORE: float = float(os.environ.get("MIN_QUALITY_SCORE", "50.0"))
|
MIN_QUALITY_SCORE: float = float(os.environ.get("MIN_QUALITY_SCORE", "50.0"))
|
||||||
|
|
||||||
# CF-core resource coordinator (VRAM lease management)
|
# CF-core resource coordinator (VRAM lease management — lease broker, not inference)
|
||||||
COORDINATOR_URL: str = os.environ.get("COORDINATOR_URL", "http://localhost:7700")
|
COORDINATOR_URL: str = os.environ.get("COORDINATOR_URL", "http://localhost:7700")
|
||||||
|
|
||||||
|
# GPU inference server URL
|
||||||
|
# Priority: GPU_SERVER_URL env var → CF_ORCH_URL env var (backward compat)
|
||||||
|
# → https://orch.circuitforge.tech when CF_LICENSE_KEY is present (Paid+)
|
||||||
|
# Resolved value is written back to os.environ["CF_ORCH_URL"] at startup so
|
||||||
|
# all service-layer callers that read CF_ORCH_URL directly see the right URL.
|
||||||
|
GPU_SERVER_URL: str | None = (
|
||||||
|
os.environ.get("GPU_SERVER_URL")
|
||||||
|
or os.environ.get("CF_ORCH_URL")
|
||||||
|
or (
|
||||||
|
"https://orch.circuitforge.tech"
|
||||||
|
if os.environ.get("CF_LICENSE_KEY")
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# Hosted cf-orch coordinator — bearer token for managed cloud GPU inference (Paid+)
|
# Hosted cf-orch coordinator — bearer token for managed cloud GPU inference (Paid+)
|
||||||
# CFOrchClient reads CF_LICENSE_KEY automatically; exposed here for startup validation.
|
# CFOrchClient reads CF_LICENSE_KEY automatically; exposed here for startup validation.
|
||||||
CF_LICENSE_KEY: str | None = os.environ.get("CF_LICENSE_KEY")
|
CF_LICENSE_KEY: str | None = os.environ.get("CF_LICENSE_KEY")
|
||||||
|
|
@ -108,3 +123,9 @@ class Settings:
|
||||||
|
|
||||||
|
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
|
|
||||||
|
# Normalise GPU_SERVER_URL into CF_ORCH_URL so every service-layer caller that
|
||||||
|
# reads os.environ.get("CF_ORCH_URL") sees the resolved value, including the
|
||||||
|
# Paid+ cloud default injected above.
|
||||||
|
if settings.GPU_SERVER_URL:
|
||||||
|
os.environ["CF_ORCH_URL"] = settings.GPU_SERVER_URL
|
||||||
|
|
|
||||||
|
|
@ -215,6 +215,35 @@ def _build_ocr_extraction_prompt(ocr_text: str) -> str:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _call_via_cf_text_vlm(alloc_url: str, image_paths: list[Path], prompt: str) -> str:
|
||||||
|
"""Call the cf-text OpenAI-compat API with images via the llama.cpp multimodal backend."""
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
content: list[dict] = []
|
||||||
|
for i, path in enumerate(image_paths):
|
||||||
|
if i > 0:
|
||||||
|
content.append({"type": "text", "text": f"(Page {i + 1} of the same recipe:)"})
|
||||||
|
b64 = _load_image_b64(path)
|
||||||
|
content.append({
|
||||||
|
"type": "image_url",
|
||||||
|
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
|
||||||
|
})
|
||||||
|
content.append({"type": "text", "text": prompt})
|
||||||
|
|
||||||
|
resp = httpx.post(
|
||||||
|
f"{alloc_url.rstrip('/')}/v1/chat/completions",
|
||||||
|
json={
|
||||||
|
"model": "local",
|
||||||
|
"messages": [{"role": "user", "content": content}],
|
||||||
|
"max_tokens": 2048,
|
||||||
|
"temperature": 0.0,
|
||||||
|
},
|
||||||
|
timeout=180.0,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()["choices"][0]["message"]["content"].strip()
|
||||||
|
|
||||||
|
|
||||||
def _call_vision_backend(
|
def _call_vision_backend(
|
||||||
image_paths: list[Path],
|
image_paths: list[Path],
|
||||||
prompt: str,
|
prompt: str,
|
||||||
|
|
@ -222,7 +251,7 @@ def _call_vision_backend(
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Dispatch to the best available vision backend.
|
"""Dispatch to the best available vision backend.
|
||||||
|
|
||||||
Priority: cf-orch docuvision (OCR + text LLM) -> local Qwen2.5-VL -> Anthropic API.
|
Priority: cf-orch (Qwen2-VL GGUF via cf-text) -> local Qwen2.5-VL -> Anthropic API.
|
||||||
Raises RuntimeError with a clear message when no backend is available.
|
Raises RuntimeError with a clear message when no backend is available.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
|
@ -237,9 +266,8 @@ def _call_vision_backend(
|
||||||
|
|
||||||
errors: list[str] = []
|
errors: list[str] = []
|
||||||
|
|
||||||
# 1. Try cf-orch task allocation → cf-docuvision OCR, then text LLM structuring.
|
# 1. Try cf-orch task allocation → cf-docuvision (Qwen2-VL GGUF via llama.cpp).
|
||||||
# Two-step: docuvision extracts text from the image(s), then LLMRouter
|
# Two-step: docuvision OCRs the image(s), then LLMRouter structures the text into JSON.
|
||||||
# converts the OCR text to structured recipe JSON using the extraction prompt.
|
|
||||||
cf_orch_url = os.environ.get("CF_ORCH_URL")
|
cf_orch_url = os.environ.get("CF_ORCH_URL")
|
||||||
if cf_orch_url:
|
if cf_orch_url:
|
||||||
try:
|
try:
|
||||||
|
|
@ -250,7 +278,6 @@ def _call_vision_backend(
|
||||||
try:
|
try:
|
||||||
_progress("allocating", "Starting vision service...")
|
_progress("allocating", "Starting vision service...")
|
||||||
with task_allocate("kiwi", "recipe_scan", service_hint="cf-docuvision", ttl_s=120.0) as alloc:
|
with task_allocate("kiwi", "recipe_scan", service_hint="cf-docuvision", ttl_s=120.0) as alloc:
|
||||||
# Step 1: OCR each image via cf-docuvision
|
|
||||||
_progress("scanning", "Extracting recipe text from photo...")
|
_progress("scanning", "Extracting recipe text from photo...")
|
||||||
doc_client = DocuvisionClient(alloc.url)
|
doc_client = DocuvisionClient(alloc.url)
|
||||||
ocr_parts: list[str] = []
|
ocr_parts: list[str] = []
|
||||||
|
|
@ -263,9 +290,11 @@ def _call_vision_backend(
|
||||||
if not combined_ocr.strip():
|
if not combined_ocr.strip():
|
||||||
raise ValueError("Docuvision returned no text — image may not be a recipe")
|
raise ValueError("Docuvision returned no text — image may not be a recipe")
|
||||||
|
|
||||||
# Step 2: Text LLM structures OCR output into recipe JSON
|
|
||||||
_progress("structuring", "Parsing recipe structure...")
|
_progress("structuring", "Parsing recipe structure...")
|
||||||
text = LLMRouter().complete(_build_ocr_extraction_prompt(combined_ocr))
|
text = LLMRouter().complete(
|
||||||
|
_build_ocr_extraction_prompt(combined_ocr),
|
||||||
|
system="You are a recipe data extractor. Return ONLY valid JSON. No markdown, no explanation, no code fences.",
|
||||||
|
)
|
||||||
if text:
|
if text:
|
||||||
return text
|
return text
|
||||||
|
|
||||||
|
|
@ -303,40 +332,76 @@ def _normalize_ingredient_name(name: str) -> str:
|
||||||
return name.lower().strip()
|
return name.lower().strip()
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_json_object(text: str) -> str | None:
|
||||||
|
"""Return the first balanced JSON object from text, or None if not found.
|
||||||
|
|
||||||
|
Uses brace-counting rather than a greedy regex so trailing prose and
|
||||||
|
nested objects are handled correctly.
|
||||||
|
"""
|
||||||
|
start = text.find("{")
|
||||||
|
if start == -1:
|
||||||
|
return None
|
||||||
|
depth = 0
|
||||||
|
in_string = False
|
||||||
|
escape_next = False
|
||||||
|
for i, ch in enumerate(text[start:], start):
|
||||||
|
if escape_next:
|
||||||
|
escape_next = False
|
||||||
|
continue
|
||||||
|
if ch == "\\" and in_string:
|
||||||
|
escape_next = True
|
||||||
|
continue
|
||||||
|
if ch == '"':
|
||||||
|
in_string = not in_string
|
||||||
|
continue
|
||||||
|
if in_string:
|
||||||
|
continue
|
||||||
|
if ch == "{":
|
||||||
|
depth += 1
|
||||||
|
elif ch == "}":
|
||||||
|
depth -= 1
|
||||||
|
if depth == 0:
|
||||||
|
return text[start : i + 1]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _parse_scanner_json(raw_text: str) -> dict:
|
def _parse_scanner_json(raw_text: str) -> dict:
|
||||||
"""Extract and return the JSON dict from VLM output.
|
"""Extract and return the JSON dict from VLM output.
|
||||||
|
|
||||||
Handles:
|
Handles:
|
||||||
- Pure JSON
|
- Pure JSON
|
||||||
- JSON wrapped in ```json ... ``` markdown fences
|
- JSON in ```json ... ``` markdown fences
|
||||||
- JSON preceded by a line of prose ("Here is the recipe: {...}")
|
- Qwen3-style <think>...</think> or <thinking>...</thinking> preambles
|
||||||
|
- JSON preceded or followed by prose
|
||||||
|
|
||||||
Raises ValueError on not_a_recipe or unparseable output.
|
Raises ValueError on not_a_recipe or unparseable output.
|
||||||
"""
|
"""
|
||||||
text = raw_text.strip()
|
text = raw_text.strip()
|
||||||
|
|
||||||
# Strip markdown fences if present
|
# Strip thinking-token blocks emitted by reasoning models (Qwen3, DeepSeek-R1, etc.)
|
||||||
if text.startswith("```"):
|
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE).strip()
|
||||||
parts = text.split("```")
|
text = re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL | re.IGNORECASE).strip()
|
||||||
for part in parts:
|
|
||||||
part = part.strip()
|
|
||||||
if part.startswith("json"):
|
|
||||||
part = part[4:].strip()
|
|
||||||
if part.startswith("{"):
|
|
||||||
text = part
|
|
||||||
break
|
|
||||||
|
|
||||||
# Try direct parse first
|
# Strip markdown fences if present
|
||||||
|
if "```" in text:
|
||||||
|
# Find the content between the first ``` pair
|
||||||
|
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
||||||
|
if fence_match:
|
||||||
|
text = fence_match.group(1).strip()
|
||||||
|
|
||||||
|
# Try direct parse
|
||||||
try:
|
try:
|
||||||
data = json.loads(text)
|
data = json.loads(text)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
# Extract first JSON object embedded in prose
|
# Fall back to brace-balanced extraction from anywhere in the output
|
||||||
match = re.search(r"\{.*\}", text, re.DOTALL)
|
candidate = _extract_json_object(text)
|
||||||
if not match:
|
if not candidate:
|
||||||
|
logger.warning("Could not parse JSON from LLM output (first 400 chars): %r", text[:400])
|
||||||
raise ValueError(f"Could not parse JSON from VLM output: {text[:200]!r}")
|
raise ValueError(f"Could not parse JSON from VLM output: {text[:200]!r}")
|
||||||
try:
|
try:
|
||||||
data = json.loads(match.group(0))
|
data = json.loads(candidate)
|
||||||
except json.JSONDecodeError as exc:
|
except json.JSONDecodeError as exc:
|
||||||
|
logger.warning("Brace-extracted JSON still invalid: %r", candidate[:400])
|
||||||
raise ValueError(f"Could not parse JSON from VLM output: {exc}") from exc
|
raise ValueError(f"Could not parse JSON from VLM output: {exc}") from exc
|
||||||
|
|
||||||
if isinstance(data, dict) and data.get("error") == "not_a_recipe":
|
if isinstance(data, dict) and data.get("error") == "not_a_recipe":
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "kiwi"
|
name = "kiwi"
|
||||||
version = "0.6.0"
|
version = "0.10.0"
|
||||||
description = "Pantry tracking + leftover recipe suggestions"
|
description = "Pantry tracking + leftover recipe suggestions"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.11"
|
requires-python = ">=3.11"
|
||||||
|
|
|
||||||
0
scripts/pipeline/purple_carrot/__init__.py
Normal file
0
scripts/pipeline/purple_carrot/__init__.py
Normal file
298
scripts/pipeline/purple_carrot/discover_wayback.py
Normal file
298
scripts/pipeline/purple_carrot/discover_wayback.py
Normal file
|
|
@ -0,0 +1,298 @@
|
||||||
|
"""
|
||||||
|
discover_wayback.py — enumerate Purple Carrot recipe slugs via the Wayback Machine.
|
||||||
|
|
||||||
|
Strategy:
|
||||||
|
1. CDX API → all archived /api/v2/menus/* URLs (multiple timestamps)
|
||||||
|
2. Replay → fetch each menu's menuItems, extract productPath slugs
|
||||||
|
3. CDX API → all archived /api/v1/products/* URLs (direct slug capture)
|
||||||
|
4. CDX API → /recipe-categories/* HTML pages for older slugs
|
||||||
|
5. Deduplicate and write manifest to OUT_FILE
|
||||||
|
|
||||||
|
Output (JSONL, one record per recipe):
|
||||||
|
{"slug": "...", "title": "...", "subtitle": "...", "cook_time": "...",
|
||||||
|
"tags": [...], "serving_size": 2, "image_url": "...",
|
||||||
|
"wayback_ts": "20260412150557", "source": "menu|product_api|category_page"}
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback
|
||||||
|
conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback --out /Library/Assets/kiwi/pipeline/pc_slugs.jsonl
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
|
||||||
|
WB_BASE = "https://web.archive.org/web"
|
||||||
|
PC_HOST = "www.purplecarrot.com"
|
||||||
|
|
||||||
|
# Polite delay between Wayback replay fetches (seconds)
|
||||||
|
REPLAY_DELAY = 1.0
|
||||||
|
CDX_DELAY = 0.5
|
||||||
|
|
||||||
|
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
||||||
|
|
||||||
|
|
||||||
|
# ── CDX helpers ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def cdx_query(url_pattern: str, **kwargs) -> list[dict]:
|
||||||
|
"""Run a CDX search and return a list of result dicts."""
|
||||||
|
params = {
|
||||||
|
"url": url_pattern,
|
||||||
|
"output": "json",
|
||||||
|
"fl": "original,timestamp,statuscode",
|
||||||
|
"collapse": "urlkey",
|
||||||
|
"filter": "statuscode:200",
|
||||||
|
**kwargs,
|
||||||
|
}
|
||||||
|
for attempt in range(3):
|
||||||
|
try:
|
||||||
|
resp = requests.get(CDX_BASE, params=params, timeout=30)
|
||||||
|
resp.raise_for_status()
|
||||||
|
rows = resp.json()
|
||||||
|
if not rows or len(rows) < 2:
|
||||||
|
return []
|
||||||
|
headers = rows[0]
|
||||||
|
return [dict(zip(headers, row)) for row in rows[1:]]
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("CDX attempt %d failed: %s", attempt + 1, exc)
|
||||||
|
time.sleep(2 ** attempt)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def wayback_get(url: str, timestamp: str) -> Any | None:
|
||||||
|
"""Fetch a Wayback replay of a URL and return parsed JSON (or None)."""
|
||||||
|
replay_url = f"{WB_BASE}/{timestamp}/{url}"
|
||||||
|
for attempt in range(3):
|
||||||
|
try:
|
||||||
|
resp = requests.get(replay_url, timeout=30)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
return resp.json()
|
||||||
|
if resp.status_code == 404:
|
||||||
|
return None
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Wayback GET attempt %d failed for %s: %s", attempt + 1, url, exc)
|
||||||
|
time.sleep(2 ** attempt)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ── Slug extraction ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def slug_from_product_path(path: str) -> str | None:
|
||||||
|
"""'/recipe/foo-bar-baz' → 'foo-bar-baz'."""
|
||||||
|
if not path:
|
||||||
|
return None
|
||||||
|
return path.strip("/").split("/")[-1] or None
|
||||||
|
|
||||||
|
|
||||||
|
def _menu_item_to_record(item: dict, wayback_ts: str) -> dict | None:
|
||||||
|
slug = slug_from_product_path(item.get("productPath", ""))
|
||||||
|
if not slug:
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
"slug": slug,
|
||||||
|
"title": item.get("title", ""),
|
||||||
|
"subtitle": item.get("subtitle", ""),
|
||||||
|
"cook_time": item.get("cookTime", ""),
|
||||||
|
"tags": item.get("filterTags") or [],
|
||||||
|
"serving_size": item.get("servingSize"),
|
||||||
|
"image_url": item.get("imageURL", ""),
|
||||||
|
"description": item.get("description", ""),
|
||||||
|
"wayback_ts": wayback_ts,
|
||||||
|
"source": "menu",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Discovery passes ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def pass_menus(seen_slugs: set[str]) -> list[dict]:
|
||||||
|
"""Walk all archived /api/v2/menus/* captures to extract slugs."""
|
||||||
|
records: list[dict] = []
|
||||||
|
|
||||||
|
# Find all distinct archived menu URLs
|
||||||
|
menu_cdx = cdx_query(f"{PC_HOST}/api/v2/menus/*", limit="500")
|
||||||
|
logger.info("CDX: %d archived menu URLs found", len(menu_cdx))
|
||||||
|
time.sleep(CDX_DELAY)
|
||||||
|
|
||||||
|
processed_menu_ids: set[str] = set()
|
||||||
|
|
||||||
|
for entry in menu_cdx:
|
||||||
|
url = entry["original"]
|
||||||
|
ts = entry["timestamp"]
|
||||||
|
|
||||||
|
# Skip the listing endpoint, only process individual menus
|
||||||
|
if not url.split("?")[0].rstrip("/").split("/")[-1].isdigit():
|
||||||
|
continue
|
||||||
|
|
||||||
|
menu_id = url.split("?")[0].rstrip("/").split("/")[-1]
|
||||||
|
if menu_id in processed_menu_ids:
|
||||||
|
continue
|
||||||
|
processed_menu_ids.add(menu_id)
|
||||||
|
|
||||||
|
logger.info("Fetching menu %s (ts=%s) ...", menu_id, ts)
|
||||||
|
data = wayback_get(url.split("?")[0] + "?logged_out=true", ts)
|
||||||
|
time.sleep(REPLAY_DELAY)
|
||||||
|
|
||||||
|
if not data or "menuItems" not in data:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for item in data["menuItems"]:
|
||||||
|
rec = _menu_item_to_record(item, ts)
|
||||||
|
if rec and rec["slug"] not in seen_slugs:
|
||||||
|
seen_slugs.add(rec["slug"])
|
||||||
|
records.append(rec)
|
||||||
|
logger.debug(" + %s", rec["slug"])
|
||||||
|
|
||||||
|
logger.info(" %d new slugs (total so far: %d)", len(records), len(seen_slugs))
|
||||||
|
|
||||||
|
return records
|
||||||
|
|
||||||
|
|
||||||
|
def pass_product_api(seen_slugs: set[str]) -> list[dict]:
|
||||||
|
"""Pick up any directly archived /api/v1/products/* URLs the menu pass missed."""
|
||||||
|
records: list[dict] = []
|
||||||
|
|
||||||
|
product_cdx = cdx_query(f"{PC_HOST}/api/v1/products/*", limit="5000")
|
||||||
|
logger.info("CDX: %d archived product API URLs found", len(product_cdx))
|
||||||
|
time.sleep(CDX_DELAY)
|
||||||
|
|
||||||
|
for entry in product_cdx:
|
||||||
|
slug = entry["original"].rstrip("/").split("/")[-1]
|
||||||
|
if not slug or slug in seen_slugs:
|
||||||
|
continue
|
||||||
|
seen_slugs.add(slug)
|
||||||
|
records.append({
|
||||||
|
"slug": slug,
|
||||||
|
"title": "",
|
||||||
|
"subtitle": "",
|
||||||
|
"cook_time": "",
|
||||||
|
"tags": [],
|
||||||
|
"serving_size": None,
|
||||||
|
"image_url": "",
|
||||||
|
"description": "",
|
||||||
|
"wayback_ts": entry["timestamp"],
|
||||||
|
"source": "product_api",
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info("product_api pass: %d new slugs", len(records))
|
||||||
|
return records
|
||||||
|
|
||||||
|
|
||||||
|
def pass_category_pages(seen_slugs: set[str]) -> list[dict]:
|
||||||
|
"""Parse archived recipe-categories HTML pages for slugs not in the API.
|
||||||
|
|
||||||
|
Category pages are rendered SSR/with inline JSON state on older captures,
|
||||||
|
so we do a simple regex scan for /recipe/<slug> patterns.
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
|
||||||
|
records: list[dict] = []
|
||||||
|
SLUG_RE = re.compile(r'["\s]/recipe/([a-z0-9][a-z0-9\-]{3,})["\s/?]')
|
||||||
|
|
||||||
|
cat_cdx = cdx_query(f"{PC_HOST}/recipe-categories/*", limit="200")
|
||||||
|
logger.info("CDX: %d archived category pages found", len(cat_cdx))
|
||||||
|
time.sleep(CDX_DELAY)
|
||||||
|
|
||||||
|
seen_category_urls: set[str] = set()
|
||||||
|
|
||||||
|
for entry in cat_cdx:
|
||||||
|
url = entry["original"].split("?")[0]
|
||||||
|
if url in seen_category_urls:
|
||||||
|
continue
|
||||||
|
seen_category_urls.add(url)
|
||||||
|
|
||||||
|
replay_url = f"{WB_BASE}/{entry['timestamp']}/{url}"
|
||||||
|
try:
|
||||||
|
resp = requests.get(replay_url, timeout=30)
|
||||||
|
time.sleep(REPLAY_DELAY)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
continue
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Category page fetch failed: %s", exc)
|
||||||
|
continue
|
||||||
|
|
||||||
|
for slug in SLUG_RE.findall(resp.text):
|
||||||
|
if slug in seen_slugs:
|
||||||
|
continue
|
||||||
|
seen_slugs.add(slug)
|
||||||
|
records.append({
|
||||||
|
"slug": slug,
|
||||||
|
"title": "",
|
||||||
|
"subtitle": "",
|
||||||
|
"cook_time": "",
|
||||||
|
"tags": [],
|
||||||
|
"serving_size": None,
|
||||||
|
"image_url": "",
|
||||||
|
"description": "",
|
||||||
|
"wayback_ts": entry["timestamp"],
|
||||||
|
"source": "category_page",
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info("category_pages pass: %d new slugs", len(records))
|
||||||
|
return records
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def discover(out_file: Path) -> None:
|
||||||
|
seen: set[str] = set()
|
||||||
|
|
||||||
|
# Load previously discovered slugs so reruns are incremental
|
||||||
|
existing: list[dict] = []
|
||||||
|
if out_file.exists():
|
||||||
|
with open(out_file) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line:
|
||||||
|
rec = json.loads(line)
|
||||||
|
seen.add(rec["slug"])
|
||||||
|
existing.append(rec)
|
||||||
|
logger.info("Loaded %d existing slugs from %s", len(seen), out_file)
|
||||||
|
|
||||||
|
new_records: list[dict] = []
|
||||||
|
new_records += pass_menus(seen)
|
||||||
|
new_records += pass_product_api(seen)
|
||||||
|
new_records += pass_category_pages(seen)
|
||||||
|
|
||||||
|
out_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(out_file, "a") as f:
|
||||||
|
for rec in new_records:
|
||||||
|
f.write(json.dumps(rec) + "\n")
|
||||||
|
|
||||||
|
total = len(existing) + len(new_records)
|
||||||
|
logger.info(
|
||||||
|
"Done. %d new slugs written to %s (%d total).",
|
||||||
|
len(new_records), out_file, total,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(description="Discover Purple Carrot recipe slugs via Wayback")
|
||||||
|
parser.add_argument(
|
||||||
|
"--out",
|
||||||
|
type=Path,
|
||||||
|
default=DEFAULT_OUT,
|
||||||
|
help=f"Output JSONL manifest (default: {DEFAULT_OUT})",
|
||||||
|
)
|
||||||
|
parser.add_argument("--debug", action="store_true")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG if args.debug else logging.INFO,
|
||||||
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||||
|
)
|
||||||
|
|
||||||
|
discover(args.out)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
529
scripts/pipeline/purple_carrot/scrape_recipes.py
Normal file
529
scripts/pipeline/purple_carrot/scrape_recipes.py
Normal file
|
|
@ -0,0 +1,529 @@
|
||||||
|
"""
|
||||||
|
scrape_recipes.py — fetch full recipe data for slugs in pc_slugs.jsonl.
|
||||||
|
|
||||||
|
For each slug:
|
||||||
|
1. Try Wayback /api/v1/products/<slug> — oldest capture first (pre-HelloFresh
|
||||||
|
acquisition data is more complete).
|
||||||
|
2. If instructions are empty, try the recipe HTML page via Wayback and parse
|
||||||
|
inline JSON state or structured markup.
|
||||||
|
3. Merge with metadata already in the manifest (title, tags, cook_time, etc.)
|
||||||
|
4. Emit one row per recipe to recipes_purplecarrot.parquet in food.com columnar
|
||||||
|
format so build_recipe_index.py can import it unchanged.
|
||||||
|
|
||||||
|
Output columns (food.com schema + PC extras ignored by the indexer):
|
||||||
|
RecipeId, Name, Subtitle, RecipeIngredientParts, RecipeInstructions,
|
||||||
|
RecipeCategory, Keywords, Calories, FatContent, ProteinContent,
|
||||||
|
SodiumContent, SugarContent, CarbohydrateContent, FiberContent,
|
||||||
|
RecipeServings, Description, ImageURL, CookTime, Slug, Source
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes
|
||||||
|
conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes \\
|
||||||
|
--slugs /Library/Assets/kiwi/pipeline/pc_slugs.jsonl \\
|
||||||
|
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet \\
|
||||||
|
--resume
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
|
||||||
|
WB_BASE = "https://web.archive.org/web"
|
||||||
|
PC_HOST = "www.purplecarrot.com"
|
||||||
|
|
||||||
|
REPLAY_DELAY = 1.2
|
||||||
|
CDX_DELAY = 0.5
|
||||||
|
|
||||||
|
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
||||||
|
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||||
|
|
||||||
|
# Inline JSON state embedded by the SSR renderer — used as fallback HTML parser
|
||||||
|
_NEXT_DATA_RE = re.compile(r'<script id="__NEXT_DATA__"[^>]*>(.*?)</script>', re.DOTALL)
|
||||||
|
_REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n', re.DOTALL)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Wayback helpers ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _cdx_timestamps(slug: str) -> list[str]:
|
||||||
|
"""Return all captured timestamps for a product slug, oldest first."""
|
||||||
|
url = f"{PC_HOST}/api/v1/products/{slug}"
|
||||||
|
try:
|
||||||
|
resp = requests.get(
|
||||||
|
CDX_BASE,
|
||||||
|
params={
|
||||||
|
"url": url,
|
||||||
|
"output": "json",
|
||||||
|
"fl": "timestamp,statuscode",
|
||||||
|
"filter": "statuscode:200",
|
||||||
|
"limit": "20",
|
||||||
|
},
|
||||||
|
timeout=20,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
rows = resp.json()
|
||||||
|
if len(rows) < 2:
|
||||||
|
return []
|
||||||
|
return [row[0] for row in rows[1:]] # timestamps only, oldest first
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("CDX timestamps failed for %s: %s", slug, exc)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def _wayback_json(url: str, timestamp: str) -> Any | None:
|
||||||
|
replay = f"{WB_BASE}/{timestamp}/{url}"
|
||||||
|
for attempt in range(3):
|
||||||
|
try:
|
||||||
|
resp = requests.get(replay, timeout=30)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
return resp.json()
|
||||||
|
if resp.status_code in (404, 410):
|
||||||
|
return None
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Wayback JSON attempt %d failed (%s): %s", attempt + 1, url, exc)
|
||||||
|
time.sleep(2 ** attempt)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _wayback_html(url: str, timestamp: str) -> str | None:
|
||||||
|
replay = f"{WB_BASE}/{timestamp}/{url}"
|
||||||
|
for attempt in range(3):
|
||||||
|
try:
|
||||||
|
resp = requests.get(replay, timeout=30)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
return resp.text
|
||||||
|
if resp.status_code in (404, 410):
|
||||||
|
return None
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Wayback HTML attempt %d failed (%s): %s", attempt + 1, url, exc)
|
||||||
|
time.sleep(2 ** attempt)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ── Recipe extraction from API JSON ──────────────────────────────────────────
|
||||||
|
|
||||||
|
def _extract_from_api(data: dict) -> dict | None:
|
||||||
|
"""Parse a /api/v1/products/<slug> response into our recipe dict.
|
||||||
|
|
||||||
|
Returns None if the response has no usable content (empty title, etc.).
|
||||||
|
Returns a partial dict if only some fields are populated — caller merges
|
||||||
|
with manifest metadata.
|
||||||
|
"""
|
||||||
|
if not data or not isinstance(data, dict):
|
||||||
|
return None
|
||||||
|
|
||||||
|
title = data.get("title", "").strip()
|
||||||
|
subtitle = data.get("subtitle", "").strip()
|
||||||
|
slug = data.get("slug", "")
|
||||||
|
|
||||||
|
skus = data.get("skus") or []
|
||||||
|
sku = skus[0] if skus else {}
|
||||||
|
|
||||||
|
# Instructions: list of {step_number, title, description}
|
||||||
|
raw_instructions = sku.get("instructions") or []
|
||||||
|
steps: list[str] = []
|
||||||
|
for step in sorted(raw_instructions, key=lambda s: s.get("step_number", 0)):
|
||||||
|
parts = []
|
||||||
|
if step.get("title"):
|
||||||
|
parts.append(step["title"])
|
||||||
|
if step.get("description"):
|
||||||
|
parts.append(step["description"])
|
||||||
|
if parts:
|
||||||
|
steps.append(". ".join(parts))
|
||||||
|
|
||||||
|
# Ingredients: may be in ingredients_quantity or ingredients
|
||||||
|
raw_ingr = sku.get("ingredients_quantity") or sku.get("ingredients") or []
|
||||||
|
ingredients: list[str] = []
|
||||||
|
for item in raw_ingr:
|
||||||
|
if isinstance(item, dict):
|
||||||
|
qty = item.get("quantity") or item.get("qty") or ""
|
||||||
|
unit = item.get("unit") or ""
|
||||||
|
name = item.get("name") or item.get("ingredient", {}).get("name", "") if isinstance(item.get("ingredient"), dict) else item.get("ingredient_name", "")
|
||||||
|
raw = item.get("raw") or item.get("display_name") or ""
|
||||||
|
line = raw or " ".join(filter(None, [str(qty), str(unit), str(name)])).strip()
|
||||||
|
if line:
|
||||||
|
ingredients.append(line)
|
||||||
|
elif isinstance(item, str) and item.strip():
|
||||||
|
ingredients.append(item.strip())
|
||||||
|
|
||||||
|
nutrition = sku.get("nutrition_label") or {}
|
||||||
|
calories = _num(nutrition.get("calories") or sku.get("calories"))
|
||||||
|
fat = _num(nutrition.get("total_fat") or sku.get("fat"))
|
||||||
|
protein = _num(nutrition.get("protein") or sku.get("protein"))
|
||||||
|
sodium = _num(nutrition.get("sodium") or sku.get("sodium"))
|
||||||
|
sugar = _num(nutrition.get("sugar") or nutrition.get("total_sugars"))
|
||||||
|
carbs = _num(nutrition.get("total_carbohydrate") or sku.get("carbs"))
|
||||||
|
fiber = _num(nutrition.get("dietary_fiber") or sku.get("fiber"))
|
||||||
|
|
||||||
|
tags = sku.get("tags") or data.get("tags") or []
|
||||||
|
category = sku.get("meal_type") or sku.get("product_type") or ""
|
||||||
|
servings = _num(sku.get("servings"))
|
||||||
|
|
||||||
|
cook_time = sku.get("prep_and_cook_time") or ""
|
||||||
|
description = sku.get("description") or ""
|
||||||
|
|
||||||
|
images = sku.get("hero_images") or sku.get("image_versions") or []
|
||||||
|
image_url = ""
|
||||||
|
if images and isinstance(images[0], dict):
|
||||||
|
image_url = images[0].get("image_url") or images[0].get("url") or ""
|
||||||
|
if not image_url and data.get("square_image"):
|
||||||
|
sq = data["square_image"]
|
||||||
|
image_url = sq.get("url") if isinstance(sq, dict) else ""
|
||||||
|
|
||||||
|
return {
|
||||||
|
"slug": slug,
|
||||||
|
"title": title,
|
||||||
|
"subtitle": subtitle,
|
||||||
|
"steps": steps,
|
||||||
|
"ingredients": ingredients,
|
||||||
|
"category": category,
|
||||||
|
"tags": tags,
|
||||||
|
"calories": calories,
|
||||||
|
"fat": fat,
|
||||||
|
"protein": protein,
|
||||||
|
"sodium": sodium,
|
||||||
|
"sugar": sugar,
|
||||||
|
"carbs": carbs,
|
||||||
|
"fiber": fiber,
|
||||||
|
"servings": servings,
|
||||||
|
"cook_time": cook_time,
|
||||||
|
"description": description,
|
||||||
|
"image_url": image_url,
|
||||||
|
"has_full_recipe": bool(steps and ingredients),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _num(val: Any) -> float | None:
|
||||||
|
if val is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
v = float(str(val).replace("g", "").replace("mg", "").split()[0])
|
||||||
|
return v if v > 0 else None
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fallback: HTML inline state parsing ──────────────────────────────────────
|
||||||
|
|
||||||
|
def _extract_from_html(html: str, slug: str) -> dict | None:
|
||||||
|
"""Try to pull recipe data from inline JS state in older SSR pages."""
|
||||||
|
# Attempt 1: Next.js __NEXT_DATA__
|
||||||
|
m = _NEXT_DATA_RE.search(html)
|
||||||
|
if m:
|
||||||
|
try:
|
||||||
|
state = json.loads(m.group(1))
|
||||||
|
# Walk the Next.js page props tree looking for recipe data
|
||||||
|
props = state.get("props", {}).get("pageProps", {})
|
||||||
|
recipe = props.get("recipe") or props.get("product")
|
||||||
|
if recipe and isinstance(recipe, dict) and recipe.get("title"):
|
||||||
|
return _extract_from_api(recipe)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Attempt 2: Redux __INITIAL_STATE__
|
||||||
|
m = _REDUX_STATE_RE.search(html)
|
||||||
|
if m:
|
||||||
|
try:
|
||||||
|
state = json.loads(m.group(1))
|
||||||
|
# Try common Redux state shapes
|
||||||
|
for key in ("recipe", "product", "currentRecipe", "currentProduct"):
|
||||||
|
recipe = state.get(key)
|
||||||
|
if recipe and isinstance(recipe, dict) and recipe.get("title"):
|
||||||
|
return _extract_from_api(recipe)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Attempt 3: JSON-LD structured data
|
||||||
|
ld_matches = re.findall(
|
||||||
|
r'<script[^>]+type=["\']application/ld\+json["\'][^>]*>(.*?)</script>',
|
||||||
|
html, re.DOTALL
|
||||||
|
)
|
||||||
|
for raw in ld_matches:
|
||||||
|
try:
|
||||||
|
ld = json.loads(raw)
|
||||||
|
if isinstance(ld, list):
|
||||||
|
ld = next((x for x in ld if x.get("@type") == "Recipe"), None)
|
||||||
|
if not ld or ld.get("@type") != "Recipe":
|
||||||
|
continue
|
||||||
|
steps = []
|
||||||
|
for inst in (ld.get("recipeInstructions") or []):
|
||||||
|
if isinstance(inst, dict):
|
||||||
|
steps.append(inst.get("text", ""))
|
||||||
|
elif isinstance(inst, str):
|
||||||
|
steps.append(inst)
|
||||||
|
ingredients = ld.get("recipeIngredient") or []
|
||||||
|
return {
|
||||||
|
"slug": slug,
|
||||||
|
"title": ld.get("name", ""),
|
||||||
|
"subtitle": "",
|
||||||
|
"steps": [s for s in steps if s],
|
||||||
|
"ingredients": [i for i in ingredients if i],
|
||||||
|
"category": ld.get("recipeCategory", ""),
|
||||||
|
"tags": ld.get("keywords", "").split(",") if isinstance(ld.get("keywords"), str) else [],
|
||||||
|
"calories": _num((ld.get("nutrition") or {}).get("calories")),
|
||||||
|
"fat": None, "protein": None, "sodium": None,
|
||||||
|
"sugar": None, "carbs": None, "fiber": None,
|
||||||
|
"servings": _num(ld.get("recipeYield")),
|
||||||
|
"cook_time": str(ld.get("totalTime") or ld.get("cookTime") or ""),
|
||||||
|
"description": ld.get("description", ""),
|
||||||
|
"image_url": (ld["image"][0] if isinstance(ld.get("image"), list) else ld.get("image", "")) or "",
|
||||||
|
"has_full_recipe": True,
|
||||||
|
}
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ── Per-slug fetch ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None:
|
||||||
|
"""Fetch the fullest available recipe data for a slug from Wayback.
|
||||||
|
|
||||||
|
Returns a merged dict of manifest metadata + API/HTML-extracted content.
|
||||||
|
"""
|
||||||
|
api_url = f"https://{PC_HOST}/api/v1/products/{slug}"
|
||||||
|
html_url = f"https://{PC_HOST}/recipe/{slug}"
|
||||||
|
|
||||||
|
recipe: dict | None = None
|
||||||
|
|
||||||
|
# Try product API — oldest captures are most likely to have full data
|
||||||
|
timestamps = _cdx_timestamps(slug)
|
||||||
|
time.sleep(CDX_DELAY)
|
||||||
|
|
||||||
|
if not timestamps and manifest_meta.get("wayback_ts"):
|
||||||
|
timestamps = [manifest_meta["wayback_ts"]]
|
||||||
|
|
||||||
|
for ts in timestamps:
|
||||||
|
data = _wayback_json(api_url, ts)
|
||||||
|
time.sleep(REPLAY_DELAY)
|
||||||
|
if not data:
|
||||||
|
continue
|
||||||
|
candidate = _extract_from_api(data)
|
||||||
|
if not candidate:
|
||||||
|
continue
|
||||||
|
recipe = candidate
|
||||||
|
if recipe.get("has_full_recipe"):
|
||||||
|
logger.debug("[%s] Full recipe from API (ts=%s)", slug, ts)
|
||||||
|
break
|
||||||
|
logger.debug("[%s] Partial API data (ts=%s) — trying HTML fallback", slug, ts)
|
||||||
|
|
||||||
|
# HTML fallback when API has no steps/ingredients
|
||||||
|
if not recipe or not recipe.get("has_full_recipe"):
|
||||||
|
html_cdx_url = f"{PC_HOST}/recipe/{slug}"
|
||||||
|
try:
|
||||||
|
html_resp = requests.get(
|
||||||
|
CDX_BASE,
|
||||||
|
params={
|
||||||
|
"url": html_cdx_url,
|
||||||
|
"output": "json",
|
||||||
|
"fl": "timestamp,statuscode",
|
||||||
|
"filter": "statuscode:200",
|
||||||
|
"limit": "5",
|
||||||
|
},
|
||||||
|
timeout=20,
|
||||||
|
)
|
||||||
|
html_ts_rows = html_resp.json() if html_resp.ok else []
|
||||||
|
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
|
||||||
|
except Exception:
|
||||||
|
html_timestamps = []
|
||||||
|
time.sleep(CDX_DELAY)
|
||||||
|
|
||||||
|
for ts in html_timestamps:
|
||||||
|
html = _wayback_html(html_url, ts)
|
||||||
|
time.sleep(REPLAY_DELAY)
|
||||||
|
if not html:
|
||||||
|
continue
|
||||||
|
html_recipe = _extract_from_html(html, slug)
|
||||||
|
if html_recipe and html_recipe.get("has_full_recipe"):
|
||||||
|
logger.debug("[%s] Full recipe from HTML (ts=%s)", slug, ts)
|
||||||
|
recipe = html_recipe
|
||||||
|
break
|
||||||
|
|
||||||
|
# Build merged record: manifest metadata fills any gaps from API/HTML
|
||||||
|
merged: dict = {
|
||||||
|
"slug": slug,
|
||||||
|
"title": manifest_meta.get("title", ""),
|
||||||
|
"subtitle": manifest_meta.get("subtitle", ""),
|
||||||
|
"steps": [],
|
||||||
|
"ingredients": [],
|
||||||
|
"category": "",
|
||||||
|
"tags": manifest_meta.get("tags") or [],
|
||||||
|
"calories": None,
|
||||||
|
"fat": None,
|
||||||
|
"protein": None,
|
||||||
|
"sodium": None,
|
||||||
|
"sugar": None,
|
||||||
|
"carbs": None,
|
||||||
|
"fiber": None,
|
||||||
|
"servings": manifest_meta.get("serving_size"),
|
||||||
|
"cook_time": manifest_meta.get("cook_time", ""),
|
||||||
|
"description": manifest_meta.get("description", ""),
|
||||||
|
"image_url": manifest_meta.get("image_url", ""),
|
||||||
|
"source": "purple_carrot",
|
||||||
|
"wayback_ts": manifest_meta.get("wayback_ts", ""),
|
||||||
|
"has_full_recipe": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
if recipe:
|
||||||
|
for key in recipe:
|
||||||
|
# Prefer API/HTML data; keep manifest value only when API field is empty
|
||||||
|
val = recipe[key]
|
||||||
|
if val or key not in merged or not merged[key]:
|
||||||
|
merged[key] = val
|
||||||
|
|
||||||
|
if not merged["title"]:
|
||||||
|
logger.warning("[%s] No title — skipping", slug)
|
||||||
|
return None
|
||||||
|
|
||||||
|
return merged
|
||||||
|
|
||||||
|
|
||||||
|
# ── Output formatting ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _to_dataframe_row(r: dict) -> dict:
|
||||||
|
"""Convert merged recipe dict to food.com-compatible parquet row."""
|
||||||
|
# Build plain-text input for allrecipes-style corpus compatibility
|
||||||
|
lines = [r["title"]]
|
||||||
|
if r.get("subtitle"):
|
||||||
|
lines.append(r["subtitle"])
|
||||||
|
if r.get("description"):
|
||||||
|
lines.append("")
|
||||||
|
lines.append(r["description"])
|
||||||
|
if r.get("ingredients"):
|
||||||
|
lines += ["", "Ingredients:"] + [f"- {i}" for i in r["ingredients"]]
|
||||||
|
if r.get("steps"):
|
||||||
|
lines += ["", "Directions:"] + [f"- {s}" for s in r["steps"]]
|
||||||
|
plain_text = "\n".join(lines)
|
||||||
|
|
||||||
|
source_url = f"https://www.purplecarrot.com/recipe/{r['slug']}"
|
||||||
|
|
||||||
|
return {
|
||||||
|
# food.com schema columns (used by build_recipe_index.py)
|
||||||
|
"RecipeId": f"pc_{r['slug']}",
|
||||||
|
"Name": r["title"],
|
||||||
|
"RecipeIngredientParts": r.get("ingredients") or [],
|
||||||
|
"RecipeInstructions": r.get("steps") or [],
|
||||||
|
"RecipeCategory": r.get("category", ""),
|
||||||
|
"Keywords": r.get("tags") or [],
|
||||||
|
"Calories": r.get("calories"),
|
||||||
|
"FatContent": r.get("fat"),
|
||||||
|
"ProteinContent": r.get("protein"),
|
||||||
|
"SodiumContent": r.get("sodium"),
|
||||||
|
"SugarContent": r.get("sugar"),
|
||||||
|
"CarbohydrateContent": r.get("carbs"),
|
||||||
|
"FiberContent": r.get("fiber"),
|
||||||
|
"RecipeServings": r.get("servings"),
|
||||||
|
# PC-specific extras (ignored by indexer, used by training pipeline)
|
||||||
|
"Subtitle": r.get("subtitle", ""),
|
||||||
|
"Description": r.get("description", ""),
|
||||||
|
"ImageURL": r.get("image_url", ""),
|
||||||
|
"CookTime": r.get("cook_time", ""),
|
||||||
|
"Slug": r["slug"],
|
||||||
|
"Source": "purple_carrot",
|
||||||
|
"SourceURL": source_url, # canonical attribution link shown in recipe UI
|
||||||
|
"HasFullRecipe": r.get("has_full_recipe", False),
|
||||||
|
"WaybackTs": r.get("wayback_ts", ""),
|
||||||
|
# Also emit plain-text input for allrecipes-compatible corpus search
|
||||||
|
"input": plain_text,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def scrape(slugs_file: Path, out_file: Path, resume: bool = True) -> None:
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
# Load manifest
|
||||||
|
if not slugs_file.exists():
|
||||||
|
logger.error("Slugs manifest not found: %s", slugs_file)
|
||||||
|
return
|
||||||
|
|
||||||
|
manifest: dict[str, dict] = {}
|
||||||
|
with open(slugs_file) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line:
|
||||||
|
rec = json.loads(line)
|
||||||
|
slug = rec["slug"]
|
||||||
|
# Keep the richest metadata if slug appears from multiple sources
|
||||||
|
if slug not in manifest or rec.get("source") == "menu":
|
||||||
|
manifest[slug] = rec
|
||||||
|
|
||||||
|
logger.info("Manifest: %d unique slugs", len(manifest))
|
||||||
|
|
||||||
|
# Load already-scraped slugs for resume
|
||||||
|
done_slugs: set[str] = set()
|
||||||
|
existing_rows: list[dict] = []
|
||||||
|
if resume and out_file.exists():
|
||||||
|
try:
|
||||||
|
existing_df = pd.read_parquet(out_file)
|
||||||
|
done_slugs = set(existing_df["Slug"].tolist())
|
||||||
|
existing_rows = existing_df.to_dict("records")
|
||||||
|
logger.info("Resume: %d already scraped", len(done_slugs))
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Could not load existing parquet for resume: %s", exc)
|
||||||
|
|
||||||
|
todo = [s for s in manifest if s not in done_slugs]
|
||||||
|
logger.info("%d slugs to fetch", len(todo))
|
||||||
|
|
||||||
|
rows = list(existing_rows)
|
||||||
|
for i, slug in enumerate(todo, 1):
|
||||||
|
logger.info("[%d/%d] %s", i, len(todo), slug)
|
||||||
|
recipe = fetch_recipe(slug, manifest[slug])
|
||||||
|
if recipe:
|
||||||
|
rows.append(_to_dataframe_row(recipe))
|
||||||
|
status = "full" if recipe.get("has_full_recipe") else "partial"
|
||||||
|
logger.info(" -> %s (%s)", recipe.get("title", "?"), status)
|
||||||
|
else:
|
||||||
|
logger.warning(" -> skipped (no title)")
|
||||||
|
|
||||||
|
# Write checkpoint every 50 recipes
|
||||||
|
if i % 50 == 0:
|
||||||
|
_write_parquet(rows, out_file)
|
||||||
|
logger.info("Checkpoint: %d recipes written", len(rows))
|
||||||
|
|
||||||
|
_write_parquet(rows, out_file)
|
||||||
|
full = sum(1 for r in rows if r.get("HasFullRecipe"))
|
||||||
|
logger.info(
|
||||||
|
"Done. %d recipes written to %s (%d full, %d partial).",
|
||||||
|
len(rows), out_file, full, len(rows) - full,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_parquet(rows: list[dict], out_file: Path) -> None:
|
||||||
|
import pandas as pd
|
||||||
|
out_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
pd.DataFrame(rows).to_parquet(out_file, index=False)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(description="Scrape Purple Carrot recipes from Wayback")
|
||||||
|
parser.add_argument("--slugs", type=Path, default=DEFAULT_SLUGS)
|
||||||
|
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||||
|
parser.add_argument(
|
||||||
|
"--no-resume", dest="resume", action="store_false",
|
||||||
|
help="Start fresh (ignore existing parquet)",
|
||||||
|
)
|
||||||
|
parser.add_argument("--debug", action="store_true")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG if args.debug else logging.INFO,
|
||||||
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||||
|
)
|
||||||
|
|
||||||
|
scrape(args.slugs, args.out, resume=args.resume)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in a new issue