Compare commits
No commits in common. "main" and "v0.9.2" have entirely different histories.
14 changed files with 30 additions and 1877 deletions
13
.env.example
13
.env.example
|
|
@ -21,12 +21,10 @@ DATA_DIR=./data
|
|||
# IP this machine advertises to the coordinator (must be reachable from coordinator host)
|
||||
# CF_ORCH_ADVERTISE_HOST=10.1.10.71
|
||||
|
||||
# GPU inference server (cf-orch coordinator for recipe scan, LLM generation, etc.)
|
||||
# GPU_SERVER_URL: set to your local cf-orch coordinator (self-hosted rack).
|
||||
# CF_ORCH_URL is the backward-compat alias — both are honoured.
|
||||
# Paid+ default: when CF_LICENSE_KEY is present and neither URL is set,
|
||||
# the app automatically points to https://orch.circuitforge.tech.
|
||||
# GPU_SERVER_URL=http://10.1.10.71:7700
|
||||
# CF-core hosted coordinator (managed cloud GPU inference — Paid+ tier)
|
||||
# Set CF_ORCH_URL to use a hosted cf-orch coordinator instead of self-hosting.
|
||||
# CF_LICENSE_KEY is read automatically by CFOrchClient for bearer auth.
|
||||
# CF_ORCH_URL=https://orch.circuitforge.tech
|
||||
# CF_LICENSE_KEY=CFG-KIWI-xxxx-xxxx-xxxx
|
||||
|
||||
# LLM backend — env-var auto-config (no llm.yaml needed for bare-metal users)
|
||||
|
|
@ -59,9 +57,6 @@ CF_APP_NAME=kiwi
|
|||
# Unset = auto-detect: true if CLOUD_MODE or circuitforge_orch is installed (paid+ local).
|
||||
# Set false to force LocalScheduler even when cf-orch is present.
|
||||
# USE_ORCH_SCHEDULER=false
|
||||
# GPU_SERVER_URL: cf-orch coordinator endpoint. Required for recipe scan (cf-docuvision)
|
||||
# and LLM features on a self-hosted rack. CF_ORCH_URL is the backward-compat alias.
|
||||
# GPU_SERVER_URL=http://10.1.10.71:7700
|
||||
|
||||
# Cloud mode (set in compose.cloud.yml; also set here for reference)
|
||||
# CLOUD_DATA_ROOT=/devl/kiwi-cloud-data
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
[](#license)
|
||||
[](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/actions)
|
||||
[](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/releases)
|
||||
[](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/releases)
|
||||
|
||||
[Documentation](https://docs.circuitforge.tech/kiwi) · [Live demo](https://menagerie.circuitforge.tech/kiwi) · [circuitforge.tech](https://circuitforge.tech)
|
||||
|
||||
|
|
@ -113,6 +113,4 @@ Kiwi uses a split license:
|
|||
- **Discovery and inventory pipeline** (barcode scan, expiry tracking, pantry CRUD, CSV export, recipe browser): [MIT](LICENSE-MIT)
|
||||
- **AI features** (receipt OCR, LLM recipe suggestions, style auto-classifier): [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use or SaaS re-hosting requires a paid license. Converts to MIT after 4 years.
|
||||
|
||||
Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)
|
||||
|
||||
Privacy · Safety · Accessibility — co-equal, non-negotiable across all CircuitForge products.
|
||||
|
|
|
|||
|
|
@ -65,24 +65,9 @@ class Settings:
|
|||
# Quality
|
||||
MIN_QUALITY_SCORE: float = float(os.environ.get("MIN_QUALITY_SCORE", "50.0"))
|
||||
|
||||
# CF-core resource coordinator (VRAM lease management — lease broker, not inference)
|
||||
# CF-core resource coordinator (VRAM lease management)
|
||||
COORDINATOR_URL: str = os.environ.get("COORDINATOR_URL", "http://localhost:7700")
|
||||
|
||||
# GPU inference server URL
|
||||
# Priority: GPU_SERVER_URL env var → CF_ORCH_URL env var (backward compat)
|
||||
# → https://orch.circuitforge.tech when CF_LICENSE_KEY is present (Paid+)
|
||||
# Resolved value is written back to os.environ["CF_ORCH_URL"] at startup so
|
||||
# all service-layer callers that read CF_ORCH_URL directly see the right URL.
|
||||
GPU_SERVER_URL: str | None = (
|
||||
os.environ.get("GPU_SERVER_URL")
|
||||
or os.environ.get("CF_ORCH_URL")
|
||||
or (
|
||||
"https://orch.circuitforge.tech"
|
||||
if os.environ.get("CF_LICENSE_KEY")
|
||||
else None
|
||||
)
|
||||
)
|
||||
|
||||
# Hosted cf-orch coordinator — bearer token for managed cloud GPU inference (Paid+)
|
||||
# CFOrchClient reads CF_LICENSE_KEY automatically; exposed here for startup validation.
|
||||
CF_LICENSE_KEY: str | None = os.environ.get("CF_LICENSE_KEY")
|
||||
|
|
@ -123,9 +108,3 @@ class Settings:
|
|||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
# Normalise GPU_SERVER_URL into CF_ORCH_URL so every service-layer caller that
|
||||
# reads os.environ.get("CF_ORCH_URL") sees the resolved value, including the
|
||||
# Paid+ cloud default injected above.
|
||||
if settings.GPU_SERVER_URL:
|
||||
os.environ["CF_ORCH_URL"] = settings.GPU_SERVER_URL
|
||||
|
|
|
|||
|
|
@ -215,35 +215,6 @@ def _build_ocr_extraction_prompt(ocr_text: str) -> str:
|
|||
)
|
||||
|
||||
|
||||
def _call_via_cf_text_vlm(alloc_url: str, image_paths: list[Path], prompt: str) -> str:
|
||||
"""Call the cf-text OpenAI-compat API with images via the llama.cpp multimodal backend."""
|
||||
import httpx
|
||||
|
||||
content: list[dict] = []
|
||||
for i, path in enumerate(image_paths):
|
||||
if i > 0:
|
||||
content.append({"type": "text", "text": f"(Page {i + 1} of the same recipe:)"})
|
||||
b64 = _load_image_b64(path)
|
||||
content.append({
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
|
||||
})
|
||||
content.append({"type": "text", "text": prompt})
|
||||
|
||||
resp = httpx.post(
|
||||
f"{alloc_url.rstrip('/')}/v1/chat/completions",
|
||||
json={
|
||||
"model": "local",
|
||||
"messages": [{"role": "user", "content": content}],
|
||||
"max_tokens": 2048,
|
||||
"temperature": 0.0,
|
||||
},
|
||||
timeout=180.0,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()["choices"][0]["message"]["content"].strip()
|
||||
|
||||
|
||||
def _call_vision_backend(
|
||||
image_paths: list[Path],
|
||||
prompt: str,
|
||||
|
|
@ -251,7 +222,7 @@ def _call_vision_backend(
|
|||
) -> str:
|
||||
"""Dispatch to the best available vision backend.
|
||||
|
||||
Priority: cf-orch (Qwen2-VL GGUF via cf-text) -> local Qwen2.5-VL -> Anthropic API.
|
||||
Priority: cf-orch docuvision (OCR + text LLM) -> local Qwen2.5-VL -> Anthropic API.
|
||||
Raises RuntimeError with a clear message when no backend is available.
|
||||
|
||||
Args:
|
||||
|
|
@ -266,8 +237,9 @@ def _call_vision_backend(
|
|||
|
||||
errors: list[str] = []
|
||||
|
||||
# 1. Try cf-orch task allocation → cf-docuvision (Qwen2-VL GGUF via llama.cpp).
|
||||
# Two-step: docuvision OCRs the image(s), then LLMRouter structures the text into JSON.
|
||||
# 1. Try cf-orch task allocation → cf-docuvision OCR, then text LLM structuring.
|
||||
# Two-step: docuvision extracts text from the image(s), then LLMRouter
|
||||
# converts the OCR text to structured recipe JSON using the extraction prompt.
|
||||
cf_orch_url = os.environ.get("CF_ORCH_URL")
|
||||
if cf_orch_url:
|
||||
try:
|
||||
|
|
@ -278,6 +250,7 @@ def _call_vision_backend(
|
|||
try:
|
||||
_progress("allocating", "Starting vision service...")
|
||||
with task_allocate("kiwi", "recipe_scan", service_hint="cf-docuvision", ttl_s=120.0) as alloc:
|
||||
# Step 1: OCR each image via cf-docuvision
|
||||
_progress("scanning", "Extracting recipe text from photo...")
|
||||
doc_client = DocuvisionClient(alloc.url)
|
||||
ocr_parts: list[str] = []
|
||||
|
|
@ -290,11 +263,9 @@ def _call_vision_backend(
|
|||
if not combined_ocr.strip():
|
||||
raise ValueError("Docuvision returned no text — image may not be a recipe")
|
||||
|
||||
# Step 2: Text LLM structures OCR output into recipe JSON
|
||||
_progress("structuring", "Parsing recipe structure...")
|
||||
text = LLMRouter().complete(
|
||||
_build_ocr_extraction_prompt(combined_ocr),
|
||||
system="You are a recipe data extractor. Return ONLY valid JSON. No markdown, no explanation, no code fences.",
|
||||
)
|
||||
text = LLMRouter().complete(_build_ocr_extraction_prompt(combined_ocr))
|
||||
if text:
|
||||
return text
|
||||
|
||||
|
|
@ -332,76 +303,40 @@ def _normalize_ingredient_name(name: str) -> str:
|
|||
return name.lower().strip()
|
||||
|
||||
|
||||
def _extract_json_object(text: str) -> str | None:
|
||||
"""Return the first balanced JSON object from text, or None if not found.
|
||||
|
||||
Uses brace-counting rather than a greedy regex so trailing prose and
|
||||
nested objects are handled correctly.
|
||||
"""
|
||||
start = text.find("{")
|
||||
if start == -1:
|
||||
return None
|
||||
depth = 0
|
||||
in_string = False
|
||||
escape_next = False
|
||||
for i, ch in enumerate(text[start:], start):
|
||||
if escape_next:
|
||||
escape_next = False
|
||||
continue
|
||||
if ch == "\\" and in_string:
|
||||
escape_next = True
|
||||
continue
|
||||
if ch == '"':
|
||||
in_string = not in_string
|
||||
continue
|
||||
if in_string:
|
||||
continue
|
||||
if ch == "{":
|
||||
depth += 1
|
||||
elif ch == "}":
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
return text[start : i + 1]
|
||||
return None
|
||||
|
||||
|
||||
def _parse_scanner_json(raw_text: str) -> dict:
|
||||
"""Extract and return the JSON dict from VLM output.
|
||||
|
||||
Handles:
|
||||
- Pure JSON
|
||||
- JSON in ```json ... ``` markdown fences
|
||||
- Qwen3-style <think>...</think> or <thinking>...</thinking> preambles
|
||||
- JSON preceded or followed by prose
|
||||
- JSON wrapped in ```json ... ``` markdown fences
|
||||
- JSON preceded by a line of prose ("Here is the recipe: {...}")
|
||||
|
||||
Raises ValueError on not_a_recipe or unparseable output.
|
||||
"""
|
||||
text = raw_text.strip()
|
||||
|
||||
# Strip thinking-token blocks emitted by reasoning models (Qwen3, DeepSeek-R1, etc.)
|
||||
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE).strip()
|
||||
text = re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL | re.IGNORECASE).strip()
|
||||
|
||||
# Strip markdown fences if present
|
||||
if "```" in text:
|
||||
# Find the content between the first ``` pair
|
||||
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
||||
if fence_match:
|
||||
text = fence_match.group(1).strip()
|
||||
if text.startswith("```"):
|
||||
parts = text.split("```")
|
||||
for part in parts:
|
||||
part = part.strip()
|
||||
if part.startswith("json"):
|
||||
part = part[4:].strip()
|
||||
if part.startswith("{"):
|
||||
text = part
|
||||
break
|
||||
|
||||
# Try direct parse
|
||||
# Try direct parse first
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
# Fall back to brace-balanced extraction from anywhere in the output
|
||||
candidate = _extract_json_object(text)
|
||||
if not candidate:
|
||||
logger.warning("Could not parse JSON from LLM output (first 400 chars): %r", text[:400])
|
||||
# Extract first JSON object embedded in prose
|
||||
match = re.search(r"\{.*\}", text, re.DOTALL)
|
||||
if not match:
|
||||
raise ValueError(f"Could not parse JSON from VLM output: {text[:200]!r}")
|
||||
try:
|
||||
data = json.loads(candidate)
|
||||
data = json.loads(match.group(0))
|
||||
except json.JSONDecodeError as exc:
|
||||
logger.warning("Brace-extracted JSON still invalid: %r", candidate[:400])
|
||||
raise ValueError(f"Could not parse JSON from VLM output: {exc}") from exc
|
||||
|
||||
if isinstance(data, dict) and data.get("error") == "not_a_recipe":
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
|||
|
||||
[project]
|
||||
name = "kiwi"
|
||||
version = "0.10.0"
|
||||
version = "0.6.0"
|
||||
description = "Pantry tracking + leftover recipe suggestions"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
|
|
|||
|
|
@ -1,218 +0,0 @@
|
|||
"""Ingest Purple Carrot scraped recipes into the Kiwi corpus database.
|
||||
|
||||
Reads recipes_purplecarrot_live.parquet (output of scrape_live.py) and
|
||||
upserts into the shared recipes table, setting source='purplecarrot' and
|
||||
using the recipe slug as the external_id (prefixed pc_).
|
||||
|
||||
Run after each weekly_harvest.sh scrape:
|
||||
|
||||
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
|
||||
[--db /Library/Assets/kiwi/kiwi.db] \
|
||||
[--parquet /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import math
|
||||
import re
|
||||
|
||||
import pandas as pd
|
||||
|
||||
# ── Helpers (inlined from build_recipe_index to avoid cross-module import) ─────
|
||||
|
||||
_MEASURE_PATTERN = re.compile(
|
||||
r"^\d[\d\s/¼½¾⅓⅔]*\s*(cup|tbsp|tsp|oz|lb|g|kg|ml|l|clove|slice|piece|can|pkg|package|bunch|head|stalk|sprig|pinch|dash|to taste|as needed)s?\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_LEAD_NUMBER = re.compile(r"^\d[\d\s/¼½¾⅓⅔]*\s*")
|
||||
_TRAILING_QUALIFIER = re.compile(
|
||||
r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _float_or_none(val: object) -> float | None:
|
||||
try:
|
||||
v = float(val) # type: ignore[arg-type]
|
||||
return v if v > 0 else None
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _safe_list(val: object) -> list:
|
||||
if val is None:
|
||||
return []
|
||||
if isinstance(val, float) and math.isnan(val):
|
||||
return []
|
||||
if isinstance(val, list):
|
||||
return val
|
||||
# Parquet often deserializes list columns as numpy arrays
|
||||
try:
|
||||
import numpy as np
|
||||
if isinstance(val, np.ndarray):
|
||||
return val.tolist()
|
||||
except ImportError:
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def _extract_ingredient_names(raw_list: list[str]) -> list[str]:
|
||||
names = []
|
||||
for raw in raw_list:
|
||||
s = raw.lower().strip()
|
||||
s = _MEASURE_PATTERN.sub("", s)
|
||||
s = _LEAD_NUMBER.sub("", s)
|
||||
s = re.sub(r"\(.*?\)", "", s)
|
||||
s = re.sub(r",.*$", "", s)
|
||||
s = _TRAILING_QUALIFIER.sub("", s)
|
||||
s = s.strip(" -.,")
|
||||
if s and len(s) > 1:
|
||||
names.append(s)
|
||||
return names
|
||||
|
||||
|
||||
def _compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
|
||||
counts: dict[str, int] = {}
|
||||
for p in profiles:
|
||||
for elem in p.get("elements", []):
|
||||
counts[elem] = counts.get(elem, 0) + 1
|
||||
if not profiles:
|
||||
return {}
|
||||
return {e: round(c / len(profiles), 3) for e, c in counts.items()}
|
||||
|
||||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_DB = Path("/Library/Assets/kiwi/kiwi.db")
|
||||
DEFAULT_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
|
||||
|
||||
|
||||
# ── Ingest ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def ingest(db_path: Path, parquet_path: Path) -> None:
|
||||
df = pd.read_parquet(parquet_path)
|
||||
|
||||
# Filter to rows with full recipe data
|
||||
if "HasFullRecipe" in df.columns:
|
||||
df = df[df["HasFullRecipe"] == True].copy()
|
||||
|
||||
if df.empty:
|
||||
print("No full recipes found in parquet — nothing to ingest.")
|
||||
return
|
||||
|
||||
print(f"Ingesting {len(df)} Purple Carrot recipes into {db_path} …")
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
try:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
|
||||
# Pre-load ingredient element profiles for coverage calculation
|
||||
profile_index: dict[str, list[str]] = {}
|
||||
for row in conn.execute("SELECT name, elements FROM ingredient_profiles"):
|
||||
try:
|
||||
profile_index[row[0]] = json.loads(row[1])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
inserted = updated = 0
|
||||
|
||||
for _, row in df.iterrows():
|
||||
slug = str(row.get("Slug", "")).strip()
|
||||
if not slug:
|
||||
continue
|
||||
|
||||
external_id = f"pc_{slug}"
|
||||
title = str(row.get("Name", "")).strip()[:500]
|
||||
if not title:
|
||||
continue
|
||||
|
||||
raw_ingredients = [str(i) for i in _safe_list(row.get("RecipeIngredientParts", []))]
|
||||
directions = [str(d) for d in _safe_list(row.get("RecipeInstructions", []))]
|
||||
|
||||
ingredient_names = _extract_ingredient_names(raw_ingredients)
|
||||
profiles = [
|
||||
{"elements": profile_index[n]}
|
||||
for n in ingredient_names if n in profile_index
|
||||
]
|
||||
coverage = _compute_element_coverage(profiles)
|
||||
|
||||
# Keywords: merge scraped tags with allergen info
|
||||
kw_raw = _safe_list(row.get("Keywords", []))
|
||||
allergens = str(row.get("Allergens", "") or "")
|
||||
if allergens:
|
||||
kw_raw = list(kw_raw) + [f"allergen:{a.strip()}" for a in allergens.split(",") if a.strip()]
|
||||
keywords_json = json.dumps(kw_raw)
|
||||
|
||||
# Check if already present (same external_id)
|
||||
existing = conn.execute(
|
||||
"SELECT id FROM recipes WHERE external_id = ?", (external_id,)
|
||||
).fetchone()
|
||||
|
||||
params = (
|
||||
title,
|
||||
json.dumps(raw_ingredients),
|
||||
json.dumps(ingredient_names),
|
||||
json.dumps(directions),
|
||||
"meal-kit", # category
|
||||
keywords_json,
|
||||
_float_or_none(row.get("Calories")),
|
||||
_float_or_none(row.get("FatContent")),
|
||||
_float_or_none(row.get("ProteinContent")),
|
||||
None, # sodium_mg — not scraped
|
||||
json.dumps(coverage),
|
||||
None, # sugar_g — not scraped
|
||||
_float_or_none(row.get("CarbohydrateContent")),
|
||||
_float_or_none(row.get("FiberContent")),
|
||||
2.0, # servings — PC meal kits are 2-serving by default
|
||||
0, # nutrition_estimated — PC provides real data
|
||||
)
|
||||
|
||||
if existing:
|
||||
conn.execute("""
|
||||
UPDATE recipes
|
||||
SET title=?, ingredients=?, ingredient_names=?, directions=?,
|
||||
category=?, keywords=?, calories=?, fat_g=?, protein_g=?,
|
||||
sodium_mg=?, element_coverage=?,
|
||||
sugar_g=?, carbs_g=?, fiber_g=?, servings=?, nutrition_estimated=?
|
||||
WHERE external_id=?
|
||||
""", params + (external_id,))
|
||||
updated += 1
|
||||
else:
|
||||
conn.execute("""
|
||||
INSERT INTO recipes
|
||||
(external_id, source, title, ingredients, ingredient_names,
|
||||
directions, category, keywords, calories, fat_g, protein_g,
|
||||
sodium_mg, element_coverage,
|
||||
sugar_g, carbs_g, fiber_g, servings, nutrition_estimated)
|
||||
VALUES (?, 'purplecarrot', ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
""", (external_id,) + params)
|
||||
inserted += 1
|
||||
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
print(f"Done — {inserted} inserted, {updated} updated")
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
|
||||
parser.add_argument("--parquet", type=Path, default=DEFAULT_PARQUET)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.parquet.exists():
|
||||
print(f"ERROR: parquet not found at {args.parquet}")
|
||||
raise SystemExit(1)
|
||||
|
||||
ingest(args.db, args.parquet)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,68 +0,0 @@
|
|||
"""
|
||||
Pipeline logging utility.
|
||||
|
||||
Adds a structured JSON FileHandler to the root logger so every pipeline
|
||||
script automatically writes machine-readable logs to the shared datastore
|
||||
at /Library/Assets/logs/pipeline/. Avocet ingests these for Turnstone
|
||||
logreading training (kiwi#141 / avocet#67).
|
||||
|
||||
Usage (add near the top of main() after logging.basicConfig):
|
||||
|
||||
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||
attach_pipeline_log("scrape_recipes")
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
PIPELINE_LOG_DIR = Path(
|
||||
os.environ.get("PIPELINE_LOG_DIR", "/Library/Assets/logs/pipeline")
|
||||
)
|
||||
|
||||
|
||||
class _JsonFormatter(logging.Formatter):
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
payload: dict = {
|
||||
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
|
||||
"level": record.levelname,
|
||||
"logger": record.name,
|
||||
"msg": record.getMessage(),
|
||||
}
|
||||
if record.exc_info:
|
||||
payload["exc"] = self.formatException(record.exc_info)
|
||||
# Any extra kwargs passed via logger.info("...", extra={...})
|
||||
standard = {
|
||||
"name", "msg", "args", "levelname", "levelno", "pathname",
|
||||
"filename", "module", "exc_info", "exc_text", "stack_info",
|
||||
"lineno", "funcName", "created", "msecs", "relativeCreated",
|
||||
"thread", "threadName", "processName", "process", "message",
|
||||
"taskName",
|
||||
}
|
||||
extra = {k: v for k, v in record.__dict__.items() if k not in standard}
|
||||
if extra:
|
||||
payload["extra"] = extra
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def attach_pipeline_log(script_name: str) -> Path:
|
||||
"""Attach a JSON file handler to the root logger for pipeline logging.
|
||||
|
||||
Returns the path of the log file created.
|
||||
"""
|
||||
PIPELINE_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%S")
|
||||
log_path = PIPELINE_LOG_DIR / f"{script_name}_{ts}.jsonl"
|
||||
|
||||
handler = logging.FileHandler(log_path, encoding="utf-8")
|
||||
handler.setLevel(logging.DEBUG)
|
||||
handler.setFormatter(_JsonFormatter())
|
||||
logging.getLogger().addHandler(handler)
|
||||
|
||||
logging.getLogger(__name__).info(
|
||||
"Pipeline log: %s", log_path, extra={"script": script_name}
|
||||
)
|
||||
return log_path
|
||||
|
|
@ -1,120 +0,0 @@
|
|||
"""Discover Purple Carrot's current weekly menu recipe slugs.
|
||||
|
||||
The main /plant-based-recipes listing page always renders the current week's
|
||||
menu as server-side HTML. This script pulls those slugs and writes them to a
|
||||
parquet that can be passed directly to scrape_live.py via --slugs-from.
|
||||
|
||||
Run weekly (e.g. via cron) to accumulate new recipes as the menu rotates.
|
||||
|
||||
Usage:
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_current_menu.py \
|
||||
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet]
|
||||
|
||||
Then scrape:
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
|
||||
--slugs-from /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet \
|
||||
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \
|
||||
--resume
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import sys
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
LISTING_URL = "https://www.purplecarrot.com/plant-based-recipes"
|
||||
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
|
||||
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet")
|
||||
|
||||
HEADERS = {
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||
),
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-US,en;q=0.5",
|
||||
}
|
||||
|
||||
RECIPE_HREF_RE = re.compile(r"/recipe/([^?#]+)")
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def discover_current_slugs() -> list[str]:
|
||||
"""Fetch the listing page and return unique recipe slugs from the current menu."""
|
||||
resp = requests.get(LISTING_URL, headers=HEADERS, timeout=15)
|
||||
if resp.status_code != 200:
|
||||
print(f"ERROR: listing page returned HTTP {resp.status_code}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
soup = BeautifulSoup(resp.text, "html.parser")
|
||||
slugs: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for a in soup.find_all("a", href=RECIPE_HREF_RE):
|
||||
m = RECIPE_HREF_RE.search(a["href"])
|
||||
if m:
|
||||
slug = m.group(1)
|
||||
if slug not in seen:
|
||||
seen.add(slug)
|
||||
slugs.append(slug)
|
||||
return slugs
|
||||
|
||||
|
||||
def main() -> None:
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Fetching current menu from {LISTING_URL} …")
|
||||
slugs = discover_current_slugs()
|
||||
|
||||
if not slugs:
|
||||
print("No slugs found — the listing page may have changed structure or blocked the request.")
|
||||
sys.exit(1)
|
||||
|
||||
today = date.today().isoformat()
|
||||
records = [
|
||||
{
|
||||
"Slug": slug,
|
||||
"SourceURL": BASE_URL.format(slug=slug),
|
||||
"Source": "purplecarrot_menu",
|
||||
"DiscoveredDate": today,
|
||||
}
|
||||
for slug in slugs
|
||||
]
|
||||
|
||||
# Merge with any existing menu parquet (accumulate weeks)
|
||||
df_new = pd.DataFrame(records)
|
||||
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if args.out.exists():
|
||||
df_prev = pd.read_parquet(args.out)
|
||||
combined = pd.concat([df_prev, df_new], ignore_index=True)
|
||||
combined = combined.drop_duplicates(subset=["Slug"], keep="first")
|
||||
df_new = combined
|
||||
|
||||
df_new.to_parquet(args.out, index=False)
|
||||
|
||||
print(f"Found {len(slugs)} current-menu slugs this week:")
|
||||
for s in slugs:
|
||||
print(f" {s}")
|
||||
print(f"\nSaved {len(df_new)} total slugs (accumulated) to {args.out}")
|
||||
print(f"\nTo scrape full recipes:")
|
||||
print(f" conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \\")
|
||||
print(f" --slugs-from {args.out} \\")
|
||||
print(f" --out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \\")
|
||||
print(f" --resume")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,218 +0,0 @@
|
|||
"""Discover Purple Carrot recipe slugs by crawling all recipe-category listing pages.
|
||||
|
||||
The site serves full server-rendered HTML for category pages, paginated via
|
||||
?page=N. Each page loads 18 recipe cards. This script crawls every category
|
||||
across all pages and writes a deduplicated slug inventory.
|
||||
|
||||
Usage:
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_slugs_categories.py \
|
||||
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet] \
|
||||
[--delay 2.0] \
|
||||
[--max-pages 50] # safety cap per category (comfort-foods has ~18)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
BASE = "https://www.purplecarrot.com"
|
||||
|
||||
# All known category slugs (from /plant-based-recipes nav)
|
||||
CATEGORIES: list[str] = [
|
||||
"comfort-foods",
|
||||
"family-friendly",
|
||||
"healthy-desserts",
|
||||
"holiday-recipes",
|
||||
"quick-and-easy",
|
||||
"party-foods",
|
||||
"seasonal-menu",
|
||||
"spring-recipes",
|
||||
"summer-recipes",
|
||||
"fall-recipes",
|
||||
"winter-recipes",
|
||||
"african",
|
||||
"american",
|
||||
"asian",
|
||||
"comfort",
|
||||
"french",
|
||||
"indian",
|
||||
"italian",
|
||||
"mediterranean",
|
||||
"mexican",
|
||||
"middle-eastern",
|
||||
"soups",
|
||||
"salads",
|
||||
"bowls",
|
||||
"pasta",
|
||||
"sandwiches-wraps",
|
||||
"tacos",
|
||||
"breakfast",
|
||||
"snacks-sides",
|
||||
]
|
||||
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet")
|
||||
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||
|
||||
RECIPE_LINK_SELECTOR = "a.c-recipe__title"
|
||||
SLUG_RE = re.compile(r"/recipe/([^?#]+)")
|
||||
|
||||
HEADERS = {
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||
),
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-US,en;q=0.5",
|
||||
}
|
||||
|
||||
|
||||
# ── Helpers ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _fetch_html(url: str, session: requests.Session) -> str | None:
|
||||
"""Fetch URL and return HTML string, or None on failure."""
|
||||
try:
|
||||
resp = session.get(url, headers=HEADERS, timeout=15)
|
||||
if resp.status_code == 200:
|
||||
return resp.text
|
||||
if resp.status_code == 404:
|
||||
return None # expected end of pagination
|
||||
print(f" HTTP {resp.status_code} — {url}")
|
||||
return None
|
||||
except Exception as exc:
|
||||
print(f" ERROR fetching {url}: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
def _extract_slugs(html: str) -> list[str]:
|
||||
"""Pull recipe slugs from one listing-page HTML response."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
slugs: list[str] = []
|
||||
for a in soup.select(RECIPE_LINK_SELECTOR):
|
||||
href = a.get("href", "")
|
||||
m = SLUG_RE.search(href)
|
||||
if m:
|
||||
slugs.append(m.group(1))
|
||||
return slugs
|
||||
|
||||
|
||||
def _get_category_total(html: str) -> int | None:
|
||||
"""Try to parse the recipe count shown on the category page (e.g. '319 Recipes')."""
|
||||
m = re.search(r"(\d+)\s+Recipes?\b", html)
|
||||
return int(m.group(1)) if m else None
|
||||
|
||||
|
||||
def _discover_category(
|
||||
category: str,
|
||||
session: requests.Session,
|
||||
delay: float,
|
||||
max_pages: int,
|
||||
) -> tuple[list[str], int]:
|
||||
"""Crawl all pages of a category, return (slugs, pages_fetched)."""
|
||||
slugs: list[str] = []
|
||||
for page_num in range(1, max_pages + 1):
|
||||
if page_num == 1:
|
||||
url = f"{BASE}/recipe-categories/{category}"
|
||||
else:
|
||||
url = f"{BASE}/recipe-categories/{category}?page={page_num}"
|
||||
|
||||
html = _fetch_html(url, session)
|
||||
if html is None:
|
||||
break # 404 or error = past the end
|
||||
|
||||
page_slugs = _extract_slugs(html)
|
||||
if not page_slugs:
|
||||
# Show total if we got a page but no links (category slug may be wrong)
|
||||
if page_num == 1:
|
||||
total = _get_category_total(html)
|
||||
if total is not None:
|
||||
print(f" page 1 loaded (total={total}) but 0 recipe links — selector may need updating")
|
||||
break
|
||||
|
||||
slugs.extend(page_slugs)
|
||||
|
||||
# Print progress
|
||||
total_hint = _get_category_total(html) if page_num == 1 else None
|
||||
total_str = f" / {total_hint}" if total_hint else ""
|
||||
print(f" page {page_num}: +{len(page_slugs)} slugs ({len(slugs)}{total_str} cumulative)")
|
||||
|
||||
if len(page_slugs) < 18:
|
||||
# Short page = last page
|
||||
break
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
return slugs, (len(slugs) + 17) // 18 # approximate pages
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||
parser.add_argument("--delay", type=float, default=2.0,
|
||||
help="Seconds between page requests")
|
||||
parser.add_argument("--max-pages", type=int, default=50,
|
||||
help="Safety cap on pages per category")
|
||||
parser.add_argument("--categories", nargs="*",
|
||||
help="Crawl only these category slugs (default: all)")
|
||||
args = parser.parse_args()
|
||||
|
||||
categories = args.categories or CATEGORIES
|
||||
|
||||
# Seed with any slugs from the Wayback parquet
|
||||
known_slugs: set[str] = set()
|
||||
if EXISTING_PARQUET.exists():
|
||||
df_wb = pd.read_parquet(EXISTING_PARQUET)
|
||||
known_slugs = set(df_wb["Slug"].dropna().tolist())
|
||||
print(f"Seeded with {len(known_slugs)} slugs from Wayback parquet")
|
||||
|
||||
all_records: list[dict[str, Any]] = []
|
||||
session = requests.Session()
|
||||
|
||||
for category in categories:
|
||||
print(f"\n[{category}]")
|
||||
cat_slugs, pages = _discover_category(category, session, args.delay, args.max_pages)
|
||||
for slug in cat_slugs:
|
||||
all_records.append({"Slug": slug, "Category": category, "Source": "purplecarrot_category"})
|
||||
print(f" → {len(cat_slugs)} slugs across ~{pages} pages")
|
||||
time.sleep(args.delay)
|
||||
|
||||
if not all_records:
|
||||
print("\nNo records found — check that categories are correct and the site is accessible")
|
||||
return
|
||||
|
||||
# Deduplicate keeping first category encountered
|
||||
df_new = pd.DataFrame(all_records)
|
||||
df_new = df_new.drop_duplicates(subset=["Slug"], keep="first")
|
||||
|
||||
# Also include Wayback slugs not already in the new set
|
||||
if known_slugs:
|
||||
wb_only = known_slugs - set(df_new["Slug"].tolist())
|
||||
if wb_only:
|
||||
df_wb_extra = pd.DataFrame([
|
||||
{"Slug": s, "Category": "wayback", "Source": "purplecarrot_wayback"}
|
||||
for s in wb_only
|
||||
])
|
||||
df_new = pd.concat([df_new, df_wb_extra], ignore_index=True)
|
||||
|
||||
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||
df_new.to_parquet(args.out, index=False)
|
||||
|
||||
new_count = len(df_new)
|
||||
cat_count = len(df_new[df_new["Source"] == "purplecarrot_category"])
|
||||
print(f"\nDone — {new_count} total slugs saved to {args.out}")
|
||||
print(f" {cat_count} from category pages, {new_count - cat_count} from Wayback only")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,301 +0,0 @@
|
|||
"""
|
||||
discover_wayback.py — enumerate Purple Carrot recipe slugs via the Wayback Machine.
|
||||
|
||||
Strategy:
|
||||
1. CDX API → all archived /api/v2/menus/* URLs (multiple timestamps)
|
||||
2. Replay → fetch each menu's menuItems, extract productPath slugs
|
||||
3. CDX API → all archived /api/v1/products/* URLs (direct slug capture)
|
||||
4. CDX API → /recipe-categories/* HTML pages for older slugs
|
||||
5. Deduplicate and write manifest to OUT_FILE
|
||||
|
||||
Output (JSONL, one record per recipe):
|
||||
{"slug": "...", "title": "...", "subtitle": "...", "cook_time": "...",
|
||||
"tags": [...], "serving_size": 2, "image_url": "...",
|
||||
"wayback_ts": "20260412150557", "source": "menu|product_api|category_page"}
|
||||
|
||||
Usage:
|
||||
conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback
|
||||
conda run -n cf python -m scripts.pipeline.purple_carrot.discover_wayback --out /Library/Assets/kiwi/pipeline/pc_slugs.jsonl
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
|
||||
WB_BASE = "https://web.archive.org/web"
|
||||
PC_HOST = "www.purplecarrot.com"
|
||||
|
||||
# Polite delay between Wayback replay fetches (seconds)
|
||||
REPLAY_DELAY = 1.0
|
||||
CDX_DELAY = 0.5
|
||||
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
||||
|
||||
|
||||
# ── CDX helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
def cdx_query(url_pattern: str, **kwargs) -> list[dict]:
|
||||
"""Run a CDX search and return a list of result dicts."""
|
||||
params = {
|
||||
"url": url_pattern,
|
||||
"output": "json",
|
||||
"fl": "original,timestamp,statuscode",
|
||||
"collapse": "urlkey",
|
||||
"filter": "statuscode:200",
|
||||
**kwargs,
|
||||
}
|
||||
for attempt in range(3):
|
||||
try:
|
||||
resp = requests.get(CDX_BASE, params=params, timeout=30)
|
||||
resp.raise_for_status()
|
||||
rows = resp.json()
|
||||
if not rows or len(rows) < 2:
|
||||
return []
|
||||
headers = rows[0]
|
||||
return [dict(zip(headers, row)) for row in rows[1:]]
|
||||
except Exception as exc:
|
||||
logger.warning("CDX attempt %d failed: %s", attempt + 1, exc)
|
||||
time.sleep(2 ** attempt)
|
||||
return []
|
||||
|
||||
|
||||
def wayback_get(url: str, timestamp: str) -> Any | None:
|
||||
"""Fetch a Wayback replay of a URL and return parsed JSON (or None)."""
|
||||
replay_url = f"{WB_BASE}/{timestamp}/{url}"
|
||||
for attempt in range(3):
|
||||
try:
|
||||
resp = requests.get(replay_url, timeout=30)
|
||||
if resp.status_code == 200:
|
||||
return resp.json()
|
||||
if resp.status_code == 404:
|
||||
return None
|
||||
except Exception as exc:
|
||||
logger.warning("Wayback GET attempt %d failed for %s: %s", attempt + 1, url, exc)
|
||||
time.sleep(2 ** attempt)
|
||||
return None
|
||||
|
||||
|
||||
# ── Slug extraction ───────────────────────────────────────────────────────────
|
||||
|
||||
def slug_from_product_path(path: str) -> str | None:
|
||||
"""'/recipe/foo-bar-baz' → 'foo-bar-baz'."""
|
||||
if not path:
|
||||
return None
|
||||
return path.strip("/").split("/")[-1] or None
|
||||
|
||||
|
||||
def _menu_item_to_record(item: dict, wayback_ts: str) -> dict | None:
|
||||
slug = slug_from_product_path(item.get("productPath", ""))
|
||||
if not slug:
|
||||
return None
|
||||
return {
|
||||
"slug": slug,
|
||||
"title": item.get("title", ""),
|
||||
"subtitle": item.get("subtitle", ""),
|
||||
"cook_time": item.get("cookTime", ""),
|
||||
"tags": item.get("filterTags") or [],
|
||||
"serving_size": item.get("servingSize"),
|
||||
"image_url": item.get("imageURL", ""),
|
||||
"description": item.get("description", ""),
|
||||
"wayback_ts": wayback_ts,
|
||||
"source": "menu",
|
||||
}
|
||||
|
||||
|
||||
# ── Discovery passes ──────────────────────────────────────────────────────────
|
||||
|
||||
def pass_menus(seen_slugs: set[str]) -> list[dict]:
|
||||
"""Walk all archived /api/v2/menus/* captures to extract slugs."""
|
||||
records: list[dict] = []
|
||||
|
||||
# Find all distinct archived menu URLs
|
||||
menu_cdx = cdx_query(f"{PC_HOST}/api/v2/menus/*", limit="500")
|
||||
logger.info("CDX: %d archived menu URLs found", len(menu_cdx))
|
||||
time.sleep(CDX_DELAY)
|
||||
|
||||
processed_menu_ids: set[str] = set()
|
||||
|
||||
for entry in menu_cdx:
|
||||
url = entry["original"]
|
||||
ts = entry["timestamp"]
|
||||
|
||||
# Skip the listing endpoint, only process individual menus
|
||||
if not url.split("?")[0].rstrip("/").split("/")[-1].isdigit():
|
||||
continue
|
||||
|
||||
menu_id = url.split("?")[0].rstrip("/").split("/")[-1]
|
||||
if menu_id in processed_menu_ids:
|
||||
continue
|
||||
processed_menu_ids.add(menu_id)
|
||||
|
||||
logger.info("Fetching menu %s (ts=%s) ...", menu_id, ts)
|
||||
data = wayback_get(url.split("?")[0] + "?logged_out=true", ts)
|
||||
time.sleep(REPLAY_DELAY)
|
||||
|
||||
if not data or "menuItems" not in data:
|
||||
continue
|
||||
|
||||
for item in data["menuItems"]:
|
||||
rec = _menu_item_to_record(item, ts)
|
||||
if rec and rec["slug"] not in seen_slugs:
|
||||
seen_slugs.add(rec["slug"])
|
||||
records.append(rec)
|
||||
logger.debug(" + %s", rec["slug"])
|
||||
|
||||
logger.info(" %d new slugs (total so far: %d)", len(records), len(seen_slugs))
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def pass_product_api(seen_slugs: set[str]) -> list[dict]:
|
||||
"""Pick up any directly archived /api/v1/products/* URLs the menu pass missed."""
|
||||
records: list[dict] = []
|
||||
|
||||
product_cdx = cdx_query(f"{PC_HOST}/api/v1/products/*", limit="5000")
|
||||
logger.info("CDX: %d archived product API URLs found", len(product_cdx))
|
||||
time.sleep(CDX_DELAY)
|
||||
|
||||
for entry in product_cdx:
|
||||
slug = entry["original"].rstrip("/").split("/")[-1]
|
||||
if not slug or slug in seen_slugs:
|
||||
continue
|
||||
seen_slugs.add(slug)
|
||||
records.append({
|
||||
"slug": slug,
|
||||
"title": "",
|
||||
"subtitle": "",
|
||||
"cook_time": "",
|
||||
"tags": [],
|
||||
"serving_size": None,
|
||||
"image_url": "",
|
||||
"description": "",
|
||||
"wayback_ts": entry["timestamp"],
|
||||
"source": "product_api",
|
||||
})
|
||||
|
||||
logger.info("product_api pass: %d new slugs", len(records))
|
||||
return records
|
||||
|
||||
|
||||
def pass_category_pages(seen_slugs: set[str]) -> list[dict]:
|
||||
"""Parse archived recipe-categories HTML pages for slugs not in the API.
|
||||
|
||||
Category pages are rendered SSR/with inline JSON state on older captures,
|
||||
so we do a simple regex scan for /recipe/<slug> patterns.
|
||||
"""
|
||||
import re
|
||||
|
||||
records: list[dict] = []
|
||||
SLUG_RE = re.compile(r'["\s]/recipe/([a-z0-9][a-z0-9\-]{3,})["\s/?]')
|
||||
|
||||
cat_cdx = cdx_query(f"{PC_HOST}/recipe-categories/*", limit="200")
|
||||
logger.info("CDX: %d archived category pages found", len(cat_cdx))
|
||||
time.sleep(CDX_DELAY)
|
||||
|
||||
seen_category_urls: set[str] = set()
|
||||
|
||||
for entry in cat_cdx:
|
||||
url = entry["original"].split("?")[0]
|
||||
if url in seen_category_urls:
|
||||
continue
|
||||
seen_category_urls.add(url)
|
||||
|
||||
replay_url = f"{WB_BASE}/{entry['timestamp']}/{url}"
|
||||
try:
|
||||
resp = requests.get(replay_url, timeout=30)
|
||||
time.sleep(REPLAY_DELAY)
|
||||
if resp.status_code != 200:
|
||||
continue
|
||||
except Exception as exc:
|
||||
logger.warning("Category page fetch failed: %s", exc)
|
||||
continue
|
||||
|
||||
for slug in SLUG_RE.findall(resp.text):
|
||||
if slug in seen_slugs:
|
||||
continue
|
||||
seen_slugs.add(slug)
|
||||
records.append({
|
||||
"slug": slug,
|
||||
"title": "",
|
||||
"subtitle": "",
|
||||
"cook_time": "",
|
||||
"tags": [],
|
||||
"serving_size": None,
|
||||
"image_url": "",
|
||||
"description": "",
|
||||
"wayback_ts": entry["timestamp"],
|
||||
"source": "category_page",
|
||||
})
|
||||
|
||||
logger.info("category_pages pass: %d new slugs", len(records))
|
||||
return records
|
||||
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
def discover(out_file: Path) -> None:
|
||||
seen: set[str] = set()
|
||||
|
||||
# Load previously discovered slugs so reruns are incremental
|
||||
existing: list[dict] = []
|
||||
if out_file.exists():
|
||||
with open(out_file) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
rec = json.loads(line)
|
||||
seen.add(rec["slug"])
|
||||
existing.append(rec)
|
||||
logger.info("Loaded %d existing slugs from %s", len(seen), out_file)
|
||||
|
||||
new_records: list[dict] = []
|
||||
new_records += pass_menus(seen)
|
||||
new_records += pass_product_api(seen)
|
||||
new_records += pass_category_pages(seen)
|
||||
|
||||
out_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(out_file, "a") as f:
|
||||
for rec in new_records:
|
||||
f.write(json.dumps(rec) + "\n")
|
||||
|
||||
total = len(existing) + len(new_records)
|
||||
logger.info(
|
||||
"Done. %d new slugs written to %s (%d total).",
|
||||
len(new_records), out_file, total,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Discover Purple Carrot recipe slugs via Wayback")
|
||||
parser.add_argument(
|
||||
"--out",
|
||||
type=Path,
|
||||
default=DEFAULT_OUT,
|
||||
help=f"Output JSONL manifest (default: {DEFAULT_OUT})",
|
||||
)
|
||||
parser.add_argument("--debug", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.debug else logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||
attach_pipeline_log("discover_wayback")
|
||||
|
||||
discover(args.out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,250 +0,0 @@
|
|||
"""Playwright scraper for live purplecarrot.com recipe pages.
|
||||
|
||||
Uses the slug inventory already in recipes_purplecarrot.parquet and fills in
|
||||
the missing ingredients/instructions by hitting the live site directly.
|
||||
|
||||
Usage:
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
|
||||
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet] \
|
||||
[--delay 2.5] \
|
||||
[--limit 20]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd
|
||||
from playwright.sync_api import sync_playwright, Page, TimeoutError as PWTimeout
|
||||
|
||||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
|
||||
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||
|
||||
RENDER_WAIT_MS = 2500 # JS render settle time
|
||||
NAV_TIMEOUT_MS = 20_000
|
||||
|
||||
|
||||
# ── Page parser ────────────────────────────────────────────────────────────────
|
||||
|
||||
def _text(page: Page, selector: str) -> str:
|
||||
el = page.query_selector(selector)
|
||||
return el.inner_text().strip() if el else ""
|
||||
|
||||
|
||||
def _texts(page: Page, selector: str) -> list[str]:
|
||||
return [el.inner_text().strip() for el in page.query_selector_all(selector)]
|
||||
|
||||
|
||||
def _parse_recipe(page: Page, slug: str, source_url: str) -> dict[str, Any] | None:
|
||||
"""Extract structured recipe data from the rendered page."""
|
||||
body = page.inner_text("body")
|
||||
|
||||
# Abort if we've been bounced to a generic listing / 404
|
||||
if "Page Not Found" in body or slug not in page.url:
|
||||
return None
|
||||
|
||||
# ── Title ──────────────────────────────────────────────────────────────────
|
||||
# The <h1> on product pages tends to be the recipe name
|
||||
title = (_text(page, "h1") or _text(page, "[class*='recipe-title']")).strip()
|
||||
if not title:
|
||||
# Fallback: first heading-like text before "Ingredients"
|
||||
idx = body.find("Ingredients\n")
|
||||
title = body[:idx].strip().splitlines()[-1] if idx > 0 else ""
|
||||
|
||||
# ── Ingredients / Instructions via body text ───────────────────────────────
|
||||
ing_start = body.find("\nIngredients\n")
|
||||
inst_start = body.find("\nInstructions\n")
|
||||
footer_start = body.find("\nShop\n") # footer sentinel
|
||||
|
||||
if ing_start == -1:
|
||||
return None # page didn't render recipe content
|
||||
|
||||
raw_ingredients: list[str] = []
|
||||
raw_instructions: list[str] = []
|
||||
|
||||
if ing_start != -1 and inst_start != -1:
|
||||
ing_block = body[ing_start + len("\nIngredients\n"):inst_start].strip()
|
||||
raw_ingredients = [l.strip() for l in ing_block.splitlines() if l.strip()]
|
||||
|
||||
if inst_start != -1:
|
||||
end = footer_start if footer_start > inst_start else len(body)
|
||||
inst_block = body[inst_start + len("\nInstructions\n"):end].strip()
|
||||
# Steps start with a digit
|
||||
steps: list[str] = []
|
||||
current: list[str] = []
|
||||
for line in inst_block.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
if re.match(r"^\d+$", line):
|
||||
if current:
|
||||
steps.append(" ".join(current))
|
||||
current = []
|
||||
elif line.startswith("CULINARY NOTES"):
|
||||
break
|
||||
else:
|
||||
current.append(line)
|
||||
if current:
|
||||
steps.append(" ".join(current))
|
||||
raw_instructions = steps
|
||||
|
||||
# ── Nutrition ──────────────────────────────────────────────────────────────
|
||||
def _extract_num(pattern: str) -> float | None:
|
||||
m = re.search(pattern, body)
|
||||
try:
|
||||
return float(m.group(1)) if m else None
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
cal = _extract_num(r"(\d+)\s*CAL")
|
||||
fat = _extract_num(r"(\d+(?:\.\d+)?)g\s*FAT")
|
||||
carbs = _extract_num(r"(\d+(?:\.\d+)?)g\s*CARBS")
|
||||
prot = _extract_num(r"(\d+(?:\.\d+)?)g\s*PROTEIN")
|
||||
fiber = _extract_num(r"(\d+(?:\.\d+)?)g\s*FIBER")
|
||||
|
||||
# ── Allergens / tags ───────────────────────────────────────────────────────
|
||||
allergen_m = re.search(r"Allergens?:\s*([^\n]+)", body)
|
||||
allergens = allergen_m.group(1).strip() if allergen_m else ""
|
||||
|
||||
# Feature tags like HIGH-PROTEIN, QUICK, etc. appear before Ingredients
|
||||
pre_ing = body[:ing_start]
|
||||
tags = re.findall(r"\b(HIGH-PROTEIN|QUICK|SPICY|LOW[\-\s]CALORIE|VEGAN|FAMILY\s+FRIENDLY)\b", pre_ing)
|
||||
|
||||
return {
|
||||
"Slug": slug,
|
||||
"Name": title,
|
||||
"SourceURL": source_url,
|
||||
"Source": "purplecarrot_live",
|
||||
"RecipeIngredientParts": raw_ingredients,
|
||||
"RecipeInstructions": raw_instructions,
|
||||
"Calories": cal,
|
||||
"FatContent": fat,
|
||||
"CarbohydrateContent": carbs,
|
||||
"ProteinContent": prot,
|
||||
"FiberContent": fiber,
|
||||
"Allergens": allergens,
|
||||
"Keywords": tags,
|
||||
"HasFullRecipe": bool(raw_ingredients and raw_instructions),
|
||||
}
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||
parser.add_argument("--delay", type=float, default=2.5,
|
||||
help="Seconds between requests (be polite)")
|
||||
parser.add_argument("--limit", type=int, default=0,
|
||||
help="Stop after N slugs (0 = all)")
|
||||
parser.add_argument("--resume", action="store_true",
|
||||
help="Skip slugs already present in --out")
|
||||
parser.add_argument("--slugs-from", type=Path, default=None,
|
||||
help="Read slug inventory from this parquet instead of the default Wayback one")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load slug inventory — either from a custom parquet or the default Wayback run
|
||||
slugs_parquet = args.slugs_from if args.slugs_from else EXISTING_PARQUET
|
||||
df_existing = pd.read_parquet(slugs_parquet)
|
||||
slugs = df_existing["Slug"].dropna().unique().tolist()
|
||||
# source_urls may not be present in custom parcets — fall back to constructing from slug
|
||||
if "SourceURL" in df_existing.columns:
|
||||
source_urls = dict(zip(df_existing["Slug"], df_existing["SourceURL"]))
|
||||
else:
|
||||
source_urls = {s: BASE_URL.format(slug=s) for s in slugs}
|
||||
|
||||
# Resume support
|
||||
done_slugs: set[str] = set()
|
||||
if args.resume and args.out.exists():
|
||||
df_done = pd.read_parquet(args.out)
|
||||
done_slugs = set(df_done["Slug"].dropna().tolist())
|
||||
print(f"Resuming — {len(done_slugs)} slugs already scraped")
|
||||
|
||||
if args.limit:
|
||||
slugs = slugs[: args.limit]
|
||||
|
||||
results: list[dict[str, Any]] = []
|
||||
skipped = 0
|
||||
failed = 0
|
||||
|
||||
_UA = (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||
)
|
||||
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
|
||||
for i, slug in enumerate(slugs):
|
||||
if slug in done_slugs:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
url = BASE_URL.format(slug=slug)
|
||||
print(f"[{i+1}/{len(slugs)}] {slug} … ", end="", flush=True)
|
||||
|
||||
# Use a fresh browser context per slug to avoid Cloudflare session-level
|
||||
# bot detection, which fires on the 2nd+ request in the same context.
|
||||
context = browser.new_context(
|
||||
user_agent=_UA,
|
||||
viewport={"width": 1280, "height": 900},
|
||||
)
|
||||
page = context.new_page()
|
||||
|
||||
try:
|
||||
page.goto(url, timeout=NAV_TIMEOUT_MS, wait_until="domcontentloaded")
|
||||
page.wait_for_timeout(RENDER_WAIT_MS)
|
||||
recipe = _parse_recipe(page, slug, source_urls.get(slug, url))
|
||||
except PWTimeout:
|
||||
print("TIMEOUT")
|
||||
failed += 1
|
||||
except Exception as exc:
|
||||
print(f"ERROR: {exc}")
|
||||
failed += 1
|
||||
else:
|
||||
if recipe is None:
|
||||
print("no content (404 or redirect)")
|
||||
failed += 1
|
||||
elif recipe["HasFullRecipe"]:
|
||||
n = len(recipe["RecipeIngredientParts"])
|
||||
s = len(recipe["RecipeInstructions"])
|
||||
print(f"OK ({n} ingredients, {s} steps)")
|
||||
results.append(recipe)
|
||||
else:
|
||||
print(f"partial (ings={len(recipe['RecipeIngredientParts'])}, steps={len(recipe['RecipeInstructions'])})")
|
||||
results.append(recipe)
|
||||
finally:
|
||||
context.close()
|
||||
|
||||
time.sleep(args.delay)
|
||||
|
||||
browser.close()
|
||||
|
||||
print(f"\nDone — {len(results)} scraped, {skipped} skipped, {failed} failed")
|
||||
|
||||
if results:
|
||||
df_out = pd.DataFrame(results)
|
||||
# Merge with existing metadata (nutrition stubs, wayback fields) for slugs
|
||||
# that didn't previously have full data
|
||||
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||
if args.resume and args.out.exists():
|
||||
df_prev = pd.read_parquet(args.out)
|
||||
df_out = pd.concat([df_prev, df_out], ignore_index=True)
|
||||
df_out = df_out.drop_duplicates(subset=["Slug"], keep="last")
|
||||
df_out.to_parquet(args.out, index=False)
|
||||
full_count = df_out["HasFullRecipe"].sum() if "HasFullRecipe" in df_out.columns else "?"
|
||||
print(f"Saved {len(df_out)} rows to {args.out} ({full_count} with full recipes)")
|
||||
else:
|
||||
print("No results — output not written")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,538 +0,0 @@
|
|||
"""
|
||||
scrape_recipes.py — fetch full recipe data for slugs in pc_slugs.jsonl.
|
||||
|
||||
For each slug:
|
||||
1. Try Wayback /api/v1/products/<slug> — oldest capture first (pre-HelloFresh
|
||||
acquisition data is more complete).
|
||||
2. If instructions are empty, try the recipe HTML page via Wayback and parse
|
||||
inline JSON state or structured markup.
|
||||
3. Merge with metadata already in the manifest (title, tags, cook_time, etc.)
|
||||
4. Emit one row per recipe to recipes_purplecarrot.parquet in food.com columnar
|
||||
format so build_recipe_index.py can import it unchanged.
|
||||
|
||||
Output columns (food.com schema + PC extras ignored by the indexer):
|
||||
RecipeId, Name, Subtitle, RecipeIngredientParts, RecipeInstructions,
|
||||
RecipeCategory, Keywords, Calories, FatContent, ProteinContent,
|
||||
SodiumContent, SugarContent, CarbohydrateContent, FiberContent,
|
||||
RecipeServings, Description, ImageURL, CookTime, Slug, Source
|
||||
|
||||
Usage:
|
||||
conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes
|
||||
conda run -n cf python -m scripts.pipeline.purple_carrot.scrape_recipes \\
|
||||
--slugs /Library/Assets/kiwi/pipeline/pc_slugs.jsonl \\
|
||||
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet \\
|
||||
--resume
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
|
||||
WB_BASE = "https://web.archive.org/web"
|
||||
PC_HOST = "www.purplecarrot.com"
|
||||
|
||||
REPLAY_DELAY = 2.0
|
||||
CDX_DELAY = 3.0 # archive.org CDX rate-limits aggressively; be polite
|
||||
|
||||
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||
|
||||
# Inline JSON state embedded by the SSR renderer — used as fallback HTML parser
|
||||
_NEXT_DATA_RE = re.compile(r'<script id="__NEXT_DATA__"[^>]*>(.*?)</script>', re.DOTALL)
|
||||
_REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n', re.DOTALL)
|
||||
|
||||
|
||||
# ── Wayback helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
def _cdx_get(params: dict) -> list:
|
||||
"""CDX request with retry on 429/503 (archive.org rate-limits aggressively)."""
|
||||
for attempt in range(4):
|
||||
try:
|
||||
resp = requests.get(CDX_BASE, params=params, timeout=25)
|
||||
if resp.status_code in (429, 503):
|
||||
wait = 15 * (2 ** attempt)
|
||||
logger.debug("CDX %s — backing off %ds", resp.status_code, wait)
|
||||
time.sleep(wait)
|
||||
continue
|
||||
resp.raise_for_status()
|
||||
rows = resp.json()
|
||||
return rows if rows else []
|
||||
except Exception as exc:
|
||||
logger.debug("CDX attempt %d failed: %s", attempt + 1, exc)
|
||||
time.sleep(5 * (attempt + 1))
|
||||
return []
|
||||
|
||||
|
||||
def _cdx_timestamps(slug: str) -> list[str]:
|
||||
"""Return captured timestamps for a product slug, oldest first (pre-2022 window)."""
|
||||
rows = _cdx_get({
|
||||
"url": f"{PC_HOST}/api/v1/products/{slug}",
|
||||
"output": "json",
|
||||
"fl": "timestamp,statuscode",
|
||||
"filter": "statuscode:200",
|
||||
"limit": "20",
|
||||
# Pre-HelloFresh-acquisition captures (2019-2021) are most likely
|
||||
# to have full instructions — API stripped them post-acquisition.
|
||||
"from": "20190101",
|
||||
"to": "20211231",
|
||||
})
|
||||
if len(rows) < 2:
|
||||
return []
|
||||
return [row[0] for row in rows[1:]] # timestamps only, oldest first
|
||||
|
||||
|
||||
def _wayback_json(url: str, timestamp: str) -> Any | None:
|
||||
replay = f"{WB_BASE}/{timestamp}/{url}"
|
||||
for attempt in range(3):
|
||||
try:
|
||||
resp = requests.get(replay, timeout=30)
|
||||
if resp.status_code == 200:
|
||||
return resp.json()
|
||||
if resp.status_code in (404, 410):
|
||||
return None
|
||||
except Exception as exc:
|
||||
logger.debug("Wayback JSON attempt %d failed (%s): %s", attempt + 1, url, exc)
|
||||
time.sleep(2 ** attempt)
|
||||
return None
|
||||
|
||||
|
||||
def _wayback_html(url: str, timestamp: str) -> str | None:
|
||||
replay = f"{WB_BASE}/{timestamp}/{url}"
|
||||
for attempt in range(3):
|
||||
try:
|
||||
resp = requests.get(replay, timeout=30)
|
||||
if resp.status_code == 200:
|
||||
return resp.text
|
||||
if resp.status_code in (404, 410):
|
||||
return None
|
||||
except Exception as exc:
|
||||
logger.debug("Wayback HTML attempt %d failed (%s): %s", attempt + 1, url, exc)
|
||||
time.sleep(2 ** attempt)
|
||||
return None
|
||||
|
||||
|
||||
# ── Recipe extraction from API JSON ──────────────────────────────────────────
|
||||
|
||||
def _extract_from_api(data: dict) -> dict | None:
|
||||
"""Parse a /api/v1/products/<slug> response into our recipe dict.
|
||||
|
||||
Returns None if the response has no usable content (empty title, etc.).
|
||||
Returns a partial dict if only some fields are populated — caller merges
|
||||
with manifest metadata.
|
||||
"""
|
||||
if not data or not isinstance(data, dict):
|
||||
return None
|
||||
|
||||
title = data.get("title", "").strip()
|
||||
subtitle = data.get("subtitle", "").strip()
|
||||
slug = data.get("slug", "")
|
||||
|
||||
skus = data.get("skus") or []
|
||||
sku = skus[0] if skus else {}
|
||||
|
||||
# Instructions: list of {step_number, title, description}
|
||||
raw_instructions = sku.get("instructions") or []
|
||||
steps: list[str] = []
|
||||
for step in sorted(raw_instructions, key=lambda s: s.get("step_number", 0)):
|
||||
parts = []
|
||||
if step.get("title"):
|
||||
parts.append(step["title"])
|
||||
if step.get("description"):
|
||||
parts.append(step["description"])
|
||||
if parts:
|
||||
steps.append(". ".join(parts))
|
||||
|
||||
# Ingredients: may be in ingredients_quantity or ingredients
|
||||
raw_ingr = sku.get("ingredients_quantity") or sku.get("ingredients") or []
|
||||
ingredients: list[str] = []
|
||||
for item in raw_ingr:
|
||||
if isinstance(item, dict):
|
||||
qty = item.get("quantity") or item.get("qty") or ""
|
||||
unit = item.get("unit") or ""
|
||||
name = item.get("name") or item.get("ingredient", {}).get("name", "") if isinstance(item.get("ingredient"), dict) else item.get("ingredient_name", "")
|
||||
raw = item.get("raw") or item.get("display_name") or ""
|
||||
line = raw or " ".join(filter(None, [str(qty), str(unit), str(name)])).strip()
|
||||
if line:
|
||||
ingredients.append(line)
|
||||
elif isinstance(item, str) and item.strip():
|
||||
ingredients.append(item.strip())
|
||||
|
||||
nutrition = sku.get("nutrition_label") or {}
|
||||
calories = _num(nutrition.get("calories") or sku.get("calories"))
|
||||
fat = _num(nutrition.get("total_fat") or sku.get("fat"))
|
||||
protein = _num(nutrition.get("protein") or sku.get("protein"))
|
||||
sodium = _num(nutrition.get("sodium") or sku.get("sodium"))
|
||||
sugar = _num(nutrition.get("sugar") or nutrition.get("total_sugars"))
|
||||
carbs = _num(nutrition.get("total_carbohydrate") or sku.get("carbs"))
|
||||
fiber = _num(nutrition.get("dietary_fiber") or sku.get("fiber"))
|
||||
|
||||
tags = sku.get("tags") or data.get("tags") or []
|
||||
category = sku.get("meal_type") or sku.get("product_type") or ""
|
||||
servings = _num(sku.get("servings"))
|
||||
|
||||
cook_time = sku.get("prep_and_cook_time") or ""
|
||||
description = sku.get("description") or ""
|
||||
|
||||
images = sku.get("hero_images") or sku.get("image_versions") or []
|
||||
# hero_images can be a list OR a dict keyed by size string — normalise to list
|
||||
if isinstance(images, dict):
|
||||
images = list(images.values())
|
||||
image_url = ""
|
||||
if images and isinstance(images[0], dict):
|
||||
image_url = images[0].get("image_url") or images[0].get("url") or ""
|
||||
if not image_url and data.get("square_image"):
|
||||
sq = data["square_image"]
|
||||
image_url = sq.get("url") if isinstance(sq, dict) else ""
|
||||
|
||||
return {
|
||||
"slug": slug,
|
||||
"title": title,
|
||||
"subtitle": subtitle,
|
||||
"steps": steps,
|
||||
"ingredients": ingredients,
|
||||
"category": category,
|
||||
"tags": tags,
|
||||
"calories": calories,
|
||||
"fat": fat,
|
||||
"protein": protein,
|
||||
"sodium": sodium,
|
||||
"sugar": sugar,
|
||||
"carbs": carbs,
|
||||
"fiber": fiber,
|
||||
"servings": servings,
|
||||
"cook_time": cook_time,
|
||||
"description": description,
|
||||
"image_url": image_url,
|
||||
"has_full_recipe": bool(steps and ingredients),
|
||||
}
|
||||
|
||||
|
||||
def _num(val: Any) -> float | None:
|
||||
if val is None:
|
||||
return None
|
||||
try:
|
||||
v = float(str(val).replace("g", "").replace("mg", "").split()[0])
|
||||
return v if v > 0 else None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
# ── Fallback: HTML inline state parsing ──────────────────────────────────────
|
||||
|
||||
def _extract_from_html(html: str, slug: str) -> dict | None:
|
||||
"""Try to pull recipe data from inline JS state in older SSR pages."""
|
||||
# Attempt 1: Next.js __NEXT_DATA__
|
||||
m = _NEXT_DATA_RE.search(html)
|
||||
if m:
|
||||
try:
|
||||
state = json.loads(m.group(1))
|
||||
# Walk the Next.js page props tree looking for recipe data
|
||||
props = state.get("props", {}).get("pageProps", {})
|
||||
recipe = props.get("recipe") or props.get("product")
|
||||
if recipe and isinstance(recipe, dict) and recipe.get("title"):
|
||||
return _extract_from_api(recipe)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Attempt 2: Redux __INITIAL_STATE__
|
||||
m = _REDUX_STATE_RE.search(html)
|
||||
if m:
|
||||
try:
|
||||
state = json.loads(m.group(1))
|
||||
# Try common Redux state shapes
|
||||
for key in ("recipe", "product", "currentRecipe", "currentProduct"):
|
||||
recipe = state.get(key)
|
||||
if recipe and isinstance(recipe, dict) and recipe.get("title"):
|
||||
return _extract_from_api(recipe)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Attempt 3: JSON-LD structured data
|
||||
ld_matches = re.findall(
|
||||
r'<script[^>]+type=["\']application/ld\+json["\'][^>]*>(.*?)</script>',
|
||||
html, re.DOTALL
|
||||
)
|
||||
for raw in ld_matches:
|
||||
try:
|
||||
ld = json.loads(raw)
|
||||
if isinstance(ld, list):
|
||||
ld = next((x for x in ld if x.get("@type") == "Recipe"), None)
|
||||
if not ld or ld.get("@type") != "Recipe":
|
||||
continue
|
||||
steps = []
|
||||
for inst in (ld.get("recipeInstructions") or []):
|
||||
if isinstance(inst, dict):
|
||||
steps.append(inst.get("text", ""))
|
||||
elif isinstance(inst, str):
|
||||
steps.append(inst)
|
||||
ingredients = ld.get("recipeIngredient") or []
|
||||
return {
|
||||
"slug": slug,
|
||||
"title": ld.get("name", ""),
|
||||
"subtitle": "",
|
||||
"steps": [s for s in steps if s],
|
||||
"ingredients": [i for i in ingredients if i],
|
||||
"category": ld.get("recipeCategory", ""),
|
||||
"tags": ld.get("keywords", "").split(",") if isinstance(ld.get("keywords"), str) else [],
|
||||
"calories": _num((ld.get("nutrition") or {}).get("calories")),
|
||||
"fat": None, "protein": None, "sodium": None,
|
||||
"sugar": None, "carbs": None, "fiber": None,
|
||||
"servings": _num(ld.get("recipeYield")),
|
||||
"cook_time": str(ld.get("totalTime") or ld.get("cookTime") or ""),
|
||||
"description": ld.get("description", ""),
|
||||
"image_url": (ld["image"][0] if isinstance(ld.get("image"), list) else ld.get("image", "")) or "",
|
||||
"has_full_recipe": True,
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ── Per-slug fetch ─────────────────────────────────────────────────────────────
|
||||
|
||||
def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None:
|
||||
"""Fetch the fullest available recipe data for a slug from Wayback.
|
||||
|
||||
Returns a merged dict of manifest metadata + API/HTML-extracted content.
|
||||
"""
|
||||
api_url = f"https://{PC_HOST}/api/v1/products/{slug}"
|
||||
html_url = f"https://{PC_HOST}/recipe/{slug}"
|
||||
|
||||
recipe: dict | None = None
|
||||
|
||||
# Try product API — oldest captures are most likely to have full data
|
||||
timestamps = _cdx_timestamps(slug)
|
||||
time.sleep(CDX_DELAY)
|
||||
|
||||
if not timestamps and manifest_meta.get("wayback_ts"):
|
||||
timestamps = [manifest_meta["wayback_ts"]]
|
||||
|
||||
for ts in timestamps:
|
||||
data = _wayback_json(api_url, ts)
|
||||
time.sleep(REPLAY_DELAY)
|
||||
if not data:
|
||||
continue
|
||||
candidate = _extract_from_api(data)
|
||||
if not candidate:
|
||||
continue
|
||||
recipe = candidate
|
||||
if recipe.get("has_full_recipe"):
|
||||
logger.debug("[%s] Full recipe from API (ts=%s)", slug, ts)
|
||||
break
|
||||
logger.debug("[%s] Partial API data (ts=%s) — trying HTML fallback", slug, ts)
|
||||
|
||||
# HTML fallback when API has no steps/ingredients
|
||||
if not recipe or not recipe.get("has_full_recipe"):
|
||||
html_ts_rows = _cdx_get({
|
||||
"url": f"{PC_HOST}/recipe/{slug}",
|
||||
"output": "json",
|
||||
"fl": "timestamp,statuscode",
|
||||
"filter": "statuscode:200",
|
||||
"limit": "10",
|
||||
})
|
||||
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
|
||||
time.sleep(CDX_DELAY)
|
||||
|
||||
for ts in html_timestamps:
|
||||
html = _wayback_html(html_url, ts)
|
||||
time.sleep(REPLAY_DELAY)
|
||||
if not html:
|
||||
continue
|
||||
html_recipe = _extract_from_html(html, slug)
|
||||
if html_recipe and html_recipe.get("has_full_recipe"):
|
||||
logger.debug("[%s] Full recipe from HTML (ts=%s)", slug, ts)
|
||||
recipe = html_recipe
|
||||
break
|
||||
|
||||
# Build merged record: manifest metadata fills any gaps from API/HTML
|
||||
merged: dict = {
|
||||
"slug": slug,
|
||||
"title": manifest_meta.get("title", ""),
|
||||
"subtitle": manifest_meta.get("subtitle", ""),
|
||||
"steps": [],
|
||||
"ingredients": [],
|
||||
"category": "",
|
||||
"tags": manifest_meta.get("tags") or [],
|
||||
"calories": None,
|
||||
"fat": None,
|
||||
"protein": None,
|
||||
"sodium": None,
|
||||
"sugar": None,
|
||||
"carbs": None,
|
||||
"fiber": None,
|
||||
"servings": manifest_meta.get("serving_size"),
|
||||
"cook_time": manifest_meta.get("cook_time", ""),
|
||||
"description": manifest_meta.get("description", ""),
|
||||
"image_url": manifest_meta.get("image_url", ""),
|
||||
"source": "purple_carrot",
|
||||
"wayback_ts": manifest_meta.get("wayback_ts", ""),
|
||||
"has_full_recipe": False,
|
||||
}
|
||||
|
||||
if recipe:
|
||||
for key in recipe:
|
||||
# Prefer API/HTML data; keep manifest value only when API field is empty
|
||||
val = recipe[key]
|
||||
if val or key not in merged or not merged[key]:
|
||||
merged[key] = val
|
||||
|
||||
if not merged["title"]:
|
||||
logger.warning("[%s] No title — skipping", slug)
|
||||
return None
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
# ── Output formatting ─────────────────────────────────────────────────────────
|
||||
|
||||
def _to_dataframe_row(r: dict) -> dict:
|
||||
"""Convert merged recipe dict to food.com-compatible parquet row."""
|
||||
# Build plain-text input for allrecipes-style corpus compatibility
|
||||
lines = [r["title"]]
|
||||
if r.get("subtitle"):
|
||||
lines.append(r["subtitle"])
|
||||
if r.get("description"):
|
||||
lines.append("")
|
||||
lines.append(r["description"])
|
||||
if r.get("ingredients"):
|
||||
lines += ["", "Ingredients:"] + [f"- {i}" for i in r["ingredients"]]
|
||||
if r.get("steps"):
|
||||
lines += ["", "Directions:"] + [f"- {s}" for s in r["steps"]]
|
||||
plain_text = "\n".join(lines)
|
||||
|
||||
source_url = f"https://www.purplecarrot.com/recipe/{r['slug']}"
|
||||
|
||||
return {
|
||||
# food.com schema columns (used by build_recipe_index.py)
|
||||
"RecipeId": f"pc_{r['slug']}",
|
||||
"Name": r["title"],
|
||||
"RecipeIngredientParts": r.get("ingredients") or [],
|
||||
"RecipeInstructions": r.get("steps") or [],
|
||||
"RecipeCategory": r.get("category", ""),
|
||||
"Keywords": r.get("tags") or [],
|
||||
"Calories": r.get("calories"),
|
||||
"FatContent": r.get("fat"),
|
||||
"ProteinContent": r.get("protein"),
|
||||
"SodiumContent": r.get("sodium"),
|
||||
"SugarContent": r.get("sugar"),
|
||||
"CarbohydrateContent": r.get("carbs"),
|
||||
"FiberContent": r.get("fiber"),
|
||||
"RecipeServings": r.get("servings"),
|
||||
# PC-specific extras (ignored by indexer, used by training pipeline)
|
||||
"Subtitle": r.get("subtitle", ""),
|
||||
"Description": r.get("description", ""),
|
||||
"ImageURL": r.get("image_url", ""),
|
||||
"CookTime": r.get("cook_time", ""),
|
||||
"Slug": r["slug"],
|
||||
"Source": "purple_carrot",
|
||||
"SourceURL": source_url, # canonical attribution link shown in recipe UI
|
||||
"HasFullRecipe": r.get("has_full_recipe", False),
|
||||
"WaybackTs": r.get("wayback_ts", ""),
|
||||
# Also emit plain-text input for allrecipes-compatible corpus search
|
||||
"input": plain_text,
|
||||
}
|
||||
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
def scrape(slugs_file: Path, out_file: Path, resume: bool = True) -> None:
|
||||
import pandas as pd
|
||||
|
||||
# Load manifest
|
||||
if not slugs_file.exists():
|
||||
logger.error("Slugs manifest not found: %s", slugs_file)
|
||||
return
|
||||
|
||||
manifest: dict[str, dict] = {}
|
||||
with open(slugs_file) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
rec = json.loads(line)
|
||||
slug = rec["slug"]
|
||||
# Keep the richest metadata if slug appears from multiple sources
|
||||
if slug not in manifest or rec.get("source") == "menu":
|
||||
manifest[slug] = rec
|
||||
|
||||
logger.info("Manifest: %d unique slugs", len(manifest))
|
||||
|
||||
# Load already-scraped slugs for resume
|
||||
done_slugs: set[str] = set()
|
||||
existing_rows: list[dict] = []
|
||||
if resume and out_file.exists():
|
||||
try:
|
||||
existing_df = pd.read_parquet(out_file)
|
||||
done_slugs = set(existing_df["Slug"].tolist())
|
||||
existing_rows = existing_df.to_dict("records")
|
||||
logger.info("Resume: %d already scraped", len(done_slugs))
|
||||
except Exception as exc:
|
||||
logger.warning("Could not load existing parquet for resume: %s", exc)
|
||||
|
||||
todo = [s for s in manifest if s not in done_slugs]
|
||||
logger.info("%d slugs to fetch", len(todo))
|
||||
|
||||
rows = list(existing_rows)
|
||||
for i, slug in enumerate(todo, 1):
|
||||
logger.info("[%d/%d] %s", i, len(todo), slug)
|
||||
recipe = fetch_recipe(slug, manifest[slug])
|
||||
if recipe:
|
||||
rows.append(_to_dataframe_row(recipe))
|
||||
status = "full" if recipe.get("has_full_recipe") else "partial"
|
||||
logger.info(" -> %s (%s)", recipe.get("title", "?"), status)
|
||||
else:
|
||||
logger.warning(" -> skipped (no title)")
|
||||
|
||||
# Write checkpoint every 50 recipes
|
||||
if i % 50 == 0:
|
||||
_write_parquet(rows, out_file)
|
||||
logger.info("Checkpoint: %d recipes written", len(rows))
|
||||
|
||||
_write_parquet(rows, out_file)
|
||||
full = sum(1 for r in rows if r.get("HasFullRecipe"))
|
||||
logger.info(
|
||||
"Done. %d recipes written to %s (%d full, %d partial).",
|
||||
len(rows), out_file, full, len(rows) - full,
|
||||
)
|
||||
|
||||
|
||||
def _write_parquet(rows: list[dict], out_file: Path) -> None:
|
||||
import pandas as pd
|
||||
out_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
pd.DataFrame(rows).to_parquet(out_file, index=False)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Scrape Purple Carrot recipes from Wayback")
|
||||
parser.add_argument("--slugs", type=Path, default=DEFAULT_SLUGS)
|
||||
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||
parser.add_argument(
|
||||
"--no-resume", dest="resume", action="store_false",
|
||||
help="Start fresh (ignore existing parquet)",
|
||||
)
|
||||
parser.add_argument("--debug", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.debug else logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||
attach_pipeline_log("scrape_recipes")
|
||||
|
||||
scrape(args.slugs, args.out, resume=args.resume)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,41 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
# Weekly Purple Carrot recipe harvest
|
||||
# Runs every Sunday night via cron.
|
||||
# Discovers this week's menu and scrapes full recipe data.
|
||||
# Logs to /Library/Assets/kiwi/pipeline/logs/purple_carrot_harvest.log
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
REPO="/Library/Development/CircuitForge/kiwi"
|
||||
MENU_OUT="/Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet"
|
||||
LIVE_OUT="/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet"
|
||||
LOG_DIR="/Library/Assets/kiwi/pipeline/logs"
|
||||
LOG="$LOG_DIR/purple_carrot_harvest.log"
|
||||
|
||||
mkdir -p "$LOG_DIR"
|
||||
|
||||
echo "=== Purple Carrot harvest $(date -u '+%Y-%m-%d %H:%M UTC') ===" >> "$LOG"
|
||||
|
||||
cd "$REPO"
|
||||
|
||||
# Step 1: discover this week's menu slugs
|
||||
echo "[1/2] Discovering current menu slugs..." | tee -a "$LOG"
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_current_menu.py \
|
||||
--out "$MENU_OUT" 2>&1 | tee -a "$LOG"
|
||||
|
||||
# Step 2: scrape full recipe data for new slugs only (--resume skips already-scraped)
|
||||
echo "[2/2] Scraping live recipe pages..." | tee -a "$LOG"
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
|
||||
--slugs-from "$MENU_OUT" \
|
||||
--out "$LIVE_OUT" \
|
||||
--resume \
|
||||
--delay 3.0 2>&1 | tee -a "$LOG"
|
||||
|
||||
# Step 3: ingest new recipes into the shared corpus DB
|
||||
echo "[3/3] Ingesting into corpus DB..." | tee -a "$LOG"
|
||||
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
|
||||
--parquet "$LIVE_OUT" \
|
||||
--db /Library/Assets/kiwi/kiwi.db 2>&1 | tee -a "$LOG"
|
||||
|
||||
echo "=== Done $(date -u '+%Y-%m-%d %H:%M UTC') ===" >> "$LOG"
|
||||
echo "" >> "$LOG"
|
||||
Loading…
Reference in a new issue