Compare commits
2 commits
84636bcdaf
...
a9ab996bcc
| Author | SHA1 | Date | |
|---|---|---|---|
| a9ab996bcc | |||
| 56f942b3fd |
6 changed files with 710 additions and 42 deletions
68
scripts/pipeline/log_utils.py
Normal file
68
scripts/pipeline/log_utils.py
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
"""
|
||||
Pipeline logging utility.
|
||||
|
||||
Adds a structured JSON FileHandler to the root logger so every pipeline
|
||||
script automatically writes machine-readable logs to the shared datastore
|
||||
at /Library/Assets/logs/pipeline/. Avocet ingests these for Turnstone
|
||||
logreading training (kiwi#141 / avocet#67).
|
||||
|
||||
Usage (add near the top of main() after logging.basicConfig):
|
||||
|
||||
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||
attach_pipeline_log("scrape_recipes")
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
PIPELINE_LOG_DIR = Path(
|
||||
os.environ.get("PIPELINE_LOG_DIR", "/Library/Assets/logs/pipeline")
|
||||
)
|
||||
|
||||
|
||||
class _JsonFormatter(logging.Formatter):
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
payload: dict = {
|
||||
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
|
||||
"level": record.levelname,
|
||||
"logger": record.name,
|
||||
"msg": record.getMessage(),
|
||||
}
|
||||
if record.exc_info:
|
||||
payload["exc"] = self.formatException(record.exc_info)
|
||||
# Any extra kwargs passed via logger.info("...", extra={...})
|
||||
standard = {
|
||||
"name", "msg", "args", "levelname", "levelno", "pathname",
|
||||
"filename", "module", "exc_info", "exc_text", "stack_info",
|
||||
"lineno", "funcName", "created", "msecs", "relativeCreated",
|
||||
"thread", "threadName", "processName", "process", "message",
|
||||
"taskName",
|
||||
}
|
||||
extra = {k: v for k, v in record.__dict__.items() if k not in standard}
|
||||
if extra:
|
||||
payload["extra"] = extra
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def attach_pipeline_log(script_name: str) -> Path:
|
||||
"""Attach a JSON file handler to the root logger for pipeline logging.
|
||||
|
||||
Returns the path of the log file created.
|
||||
"""
|
||||
PIPELINE_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%S")
|
||||
log_path = PIPELINE_LOG_DIR / f"{script_name}_{ts}.jsonl"
|
||||
|
||||
handler = logging.FileHandler(log_path, encoding="utf-8")
|
||||
handler.setLevel(logging.DEBUG)
|
||||
handler.setFormatter(_JsonFormatter())
|
||||
logging.getLogger().addHandler(handler)
|
||||
|
||||
logging.getLogger(__name__).info(
|
||||
"Pipeline log: %s", log_path, extra={"script": script_name}
|
||||
)
|
||||
return log_path
|
||||
120
scripts/pipeline/purple_carrot/discover_current_menu.py
Normal file
120
scripts/pipeline/purple_carrot/discover_current_menu.py
Normal file
|
|
@ -0,0 +1,120 @@
|
|||
"""Discover Purple Carrot's current weekly menu recipe slugs.
|
||||
|
||||
The main /plant-based-recipes listing page always renders the current week's
|
||||
menu as server-side HTML. This script pulls those slugs and writes them to a
|
||||
parquet that can be passed directly to scrape_live.py via --slugs-from.
|
||||
|
||||
Run weekly (e.g. via cron) to accumulate new recipes as the menu rotates.
|
||||
|
||||
Usage:
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_current_menu.py \
|
||||
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet]
|
||||
|
||||
Then scrape:
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
|
||||
--slugs-from /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet \
|
||||
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \
|
||||
--resume
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import sys
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
LISTING_URL = "https://www.purplecarrot.com/plant-based-recipes"
|
||||
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
|
||||
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet")
|
||||
|
||||
HEADERS = {
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||
),
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-US,en;q=0.5",
|
||||
}
|
||||
|
||||
RECIPE_HREF_RE = re.compile(r"/recipe/([^?#]+)")
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def discover_current_slugs() -> list[str]:
|
||||
"""Fetch the listing page and return unique recipe slugs from the current menu."""
|
||||
resp = requests.get(LISTING_URL, headers=HEADERS, timeout=15)
|
||||
if resp.status_code != 200:
|
||||
print(f"ERROR: listing page returned HTTP {resp.status_code}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
soup = BeautifulSoup(resp.text, "html.parser")
|
||||
slugs: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for a in soup.find_all("a", href=RECIPE_HREF_RE):
|
||||
m = RECIPE_HREF_RE.search(a["href"])
|
||||
if m:
|
||||
slug = m.group(1)
|
||||
if slug not in seen:
|
||||
seen.add(slug)
|
||||
slugs.append(slug)
|
||||
return slugs
|
||||
|
||||
|
||||
def main() -> None:
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Fetching current menu from {LISTING_URL} …")
|
||||
slugs = discover_current_slugs()
|
||||
|
||||
if not slugs:
|
||||
print("No slugs found — the listing page may have changed structure or blocked the request.")
|
||||
sys.exit(1)
|
||||
|
||||
today = date.today().isoformat()
|
||||
records = [
|
||||
{
|
||||
"Slug": slug,
|
||||
"SourceURL": BASE_URL.format(slug=slug),
|
||||
"Source": "purplecarrot_menu",
|
||||
"DiscoveredDate": today,
|
||||
}
|
||||
for slug in slugs
|
||||
]
|
||||
|
||||
# Merge with any existing menu parquet (accumulate weeks)
|
||||
df_new = pd.DataFrame(records)
|
||||
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if args.out.exists():
|
||||
df_prev = pd.read_parquet(args.out)
|
||||
combined = pd.concat([df_prev, df_new], ignore_index=True)
|
||||
combined = combined.drop_duplicates(subset=["Slug"], keep="first")
|
||||
df_new = combined
|
||||
|
||||
df_new.to_parquet(args.out, index=False)
|
||||
|
||||
print(f"Found {len(slugs)} current-menu slugs this week:")
|
||||
for s in slugs:
|
||||
print(f" {s}")
|
||||
print(f"\nSaved {len(df_new)} total slugs (accumulated) to {args.out}")
|
||||
print(f"\nTo scrape full recipes:")
|
||||
print(f" conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \\")
|
||||
print(f" --slugs-from {args.out} \\")
|
||||
print(f" --out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \\")
|
||||
print(f" --resume")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
218
scripts/pipeline/purple_carrot/discover_slugs_categories.py
Normal file
218
scripts/pipeline/purple_carrot/discover_slugs_categories.py
Normal file
|
|
@ -0,0 +1,218 @@
|
|||
"""Discover Purple Carrot recipe slugs by crawling all recipe-category listing pages.
|
||||
|
||||
The site serves full server-rendered HTML for category pages, paginated via
|
||||
?page=N. Each page loads 18 recipe cards. This script crawls every category
|
||||
across all pages and writes a deduplicated slug inventory.
|
||||
|
||||
Usage:
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_slugs_categories.py \
|
||||
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet] \
|
||||
[--delay 2.0] \
|
||||
[--max-pages 50] # safety cap per category (comfort-foods has ~18)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
BASE = "https://www.purplecarrot.com"
|
||||
|
||||
# All known category slugs (from /plant-based-recipes nav)
|
||||
CATEGORIES: list[str] = [
|
||||
"comfort-foods",
|
||||
"family-friendly",
|
||||
"healthy-desserts",
|
||||
"holiday-recipes",
|
||||
"quick-and-easy",
|
||||
"party-foods",
|
||||
"seasonal-menu",
|
||||
"spring-recipes",
|
||||
"summer-recipes",
|
||||
"fall-recipes",
|
||||
"winter-recipes",
|
||||
"african",
|
||||
"american",
|
||||
"asian",
|
||||
"comfort",
|
||||
"french",
|
||||
"indian",
|
||||
"italian",
|
||||
"mediterranean",
|
||||
"mexican",
|
||||
"middle-eastern",
|
||||
"soups",
|
||||
"salads",
|
||||
"bowls",
|
||||
"pasta",
|
||||
"sandwiches-wraps",
|
||||
"tacos",
|
||||
"breakfast",
|
||||
"snacks-sides",
|
||||
]
|
||||
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet")
|
||||
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||
|
||||
RECIPE_LINK_SELECTOR = "a.c-recipe__title"
|
||||
SLUG_RE = re.compile(r"/recipe/([^?#]+)")
|
||||
|
||||
HEADERS = {
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||
),
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-US,en;q=0.5",
|
||||
}
|
||||
|
||||
|
||||
# ── Helpers ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _fetch_html(url: str, session: requests.Session) -> str | None:
|
||||
"""Fetch URL and return HTML string, or None on failure."""
|
||||
try:
|
||||
resp = session.get(url, headers=HEADERS, timeout=15)
|
||||
if resp.status_code == 200:
|
||||
return resp.text
|
||||
if resp.status_code == 404:
|
||||
return None # expected end of pagination
|
||||
print(f" HTTP {resp.status_code} — {url}")
|
||||
return None
|
||||
except Exception as exc:
|
||||
print(f" ERROR fetching {url}: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
def _extract_slugs(html: str) -> list[str]:
|
||||
"""Pull recipe slugs from one listing-page HTML response."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
slugs: list[str] = []
|
||||
for a in soup.select(RECIPE_LINK_SELECTOR):
|
||||
href = a.get("href", "")
|
||||
m = SLUG_RE.search(href)
|
||||
if m:
|
||||
slugs.append(m.group(1))
|
||||
return slugs
|
||||
|
||||
|
||||
def _get_category_total(html: str) -> int | None:
|
||||
"""Try to parse the recipe count shown on the category page (e.g. '319 Recipes')."""
|
||||
m = re.search(r"(\d+)\s+Recipes?\b", html)
|
||||
return int(m.group(1)) if m else None
|
||||
|
||||
|
||||
def _discover_category(
|
||||
category: str,
|
||||
session: requests.Session,
|
||||
delay: float,
|
||||
max_pages: int,
|
||||
) -> tuple[list[str], int]:
|
||||
"""Crawl all pages of a category, return (slugs, pages_fetched)."""
|
||||
slugs: list[str] = []
|
||||
for page_num in range(1, max_pages + 1):
|
||||
if page_num == 1:
|
||||
url = f"{BASE}/recipe-categories/{category}"
|
||||
else:
|
||||
url = f"{BASE}/recipe-categories/{category}?page={page_num}"
|
||||
|
||||
html = _fetch_html(url, session)
|
||||
if html is None:
|
||||
break # 404 or error = past the end
|
||||
|
||||
page_slugs = _extract_slugs(html)
|
||||
if not page_slugs:
|
||||
# Show total if we got a page but no links (category slug may be wrong)
|
||||
if page_num == 1:
|
||||
total = _get_category_total(html)
|
||||
if total is not None:
|
||||
print(f" page 1 loaded (total={total}) but 0 recipe links — selector may need updating")
|
||||
break
|
||||
|
||||
slugs.extend(page_slugs)
|
||||
|
||||
# Print progress
|
||||
total_hint = _get_category_total(html) if page_num == 1 else None
|
||||
total_str = f" / {total_hint}" if total_hint else ""
|
||||
print(f" page {page_num}: +{len(page_slugs)} slugs ({len(slugs)}{total_str} cumulative)")
|
||||
|
||||
if len(page_slugs) < 18:
|
||||
# Short page = last page
|
||||
break
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
return slugs, (len(slugs) + 17) // 18 # approximate pages
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||
parser.add_argument("--delay", type=float, default=2.0,
|
||||
help="Seconds between page requests")
|
||||
parser.add_argument("--max-pages", type=int, default=50,
|
||||
help="Safety cap on pages per category")
|
||||
parser.add_argument("--categories", nargs="*",
|
||||
help="Crawl only these category slugs (default: all)")
|
||||
args = parser.parse_args()
|
||||
|
||||
categories = args.categories or CATEGORIES
|
||||
|
||||
# Seed with any slugs from the Wayback parquet
|
||||
known_slugs: set[str] = set()
|
||||
if EXISTING_PARQUET.exists():
|
||||
df_wb = pd.read_parquet(EXISTING_PARQUET)
|
||||
known_slugs = set(df_wb["Slug"].dropna().tolist())
|
||||
print(f"Seeded with {len(known_slugs)} slugs from Wayback parquet")
|
||||
|
||||
all_records: list[dict[str, Any]] = []
|
||||
session = requests.Session()
|
||||
|
||||
for category in categories:
|
||||
print(f"\n[{category}]")
|
||||
cat_slugs, pages = _discover_category(category, session, args.delay, args.max_pages)
|
||||
for slug in cat_slugs:
|
||||
all_records.append({"Slug": slug, "Category": category, "Source": "purplecarrot_category"})
|
||||
print(f" → {len(cat_slugs)} slugs across ~{pages} pages")
|
||||
time.sleep(args.delay)
|
||||
|
||||
if not all_records:
|
||||
print("\nNo records found — check that categories are correct and the site is accessible")
|
||||
return
|
||||
|
||||
# Deduplicate keeping first category encountered
|
||||
df_new = pd.DataFrame(all_records)
|
||||
df_new = df_new.drop_duplicates(subset=["Slug"], keep="first")
|
||||
|
||||
# Also include Wayback slugs not already in the new set
|
||||
if known_slugs:
|
||||
wb_only = known_slugs - set(df_new["Slug"].tolist())
|
||||
if wb_only:
|
||||
df_wb_extra = pd.DataFrame([
|
||||
{"Slug": s, "Category": "wayback", "Source": "purplecarrot_wayback"}
|
||||
for s in wb_only
|
||||
])
|
||||
df_new = pd.concat([df_new, df_wb_extra], ignore_index=True)
|
||||
|
||||
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||
df_new.to_parquet(args.out, index=False)
|
||||
|
||||
new_count = len(df_new)
|
||||
cat_count = len(df_new[df_new["Source"] == "purplecarrot_category"])
|
||||
print(f"\nDone — {new_count} total slugs saved to {args.out}")
|
||||
print(f" {cat_count} from category pages, {new_count - cat_count} from Wayback only")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -31,7 +31,7 @@ import requests
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
|
||||
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
|
||||
WB_BASE = "https://web.archive.org/web"
|
||||
PC_HOST = "www.purplecarrot.com"
|
||||
|
||||
|
|
@ -291,6 +291,9 @@ def main() -> None:
|
|||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||
attach_pipeline_log("discover_wayback")
|
||||
|
||||
discover(args.out)
|
||||
|
||||
|
||||
|
|
|
|||
250
scripts/pipeline/purple_carrot/scrape_live.py
Normal file
250
scripts/pipeline/purple_carrot/scrape_live.py
Normal file
|
|
@ -0,0 +1,250 @@
|
|||
"""Playwright scraper for live purplecarrot.com recipe pages.
|
||||
|
||||
Uses the slug inventory already in recipes_purplecarrot.parquet and fills in
|
||||
the missing ingredients/instructions by hitting the live site directly.
|
||||
|
||||
Usage:
|
||||
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
|
||||
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet] \
|
||||
[--delay 2.5] \
|
||||
[--limit 20]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd
|
||||
from playwright.sync_api import sync_playwright, Page, TimeoutError as PWTimeout
|
||||
|
||||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
|
||||
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||
|
||||
RENDER_WAIT_MS = 2500 # JS render settle time
|
||||
NAV_TIMEOUT_MS = 20_000
|
||||
|
||||
|
||||
# ── Page parser ────────────────────────────────────────────────────────────────
|
||||
|
||||
def _text(page: Page, selector: str) -> str:
|
||||
el = page.query_selector(selector)
|
||||
return el.inner_text().strip() if el else ""
|
||||
|
||||
|
||||
def _texts(page: Page, selector: str) -> list[str]:
|
||||
return [el.inner_text().strip() for el in page.query_selector_all(selector)]
|
||||
|
||||
|
||||
def _parse_recipe(page: Page, slug: str, source_url: str) -> dict[str, Any] | None:
|
||||
"""Extract structured recipe data from the rendered page."""
|
||||
body = page.inner_text("body")
|
||||
|
||||
# Abort if we've been bounced to a generic listing / 404
|
||||
if "Page Not Found" in body or slug not in page.url:
|
||||
return None
|
||||
|
||||
# ── Title ──────────────────────────────────────────────────────────────────
|
||||
# The <h1> on product pages tends to be the recipe name
|
||||
title = (_text(page, "h1") or _text(page, "[class*='recipe-title']")).strip()
|
||||
if not title:
|
||||
# Fallback: first heading-like text before "Ingredients"
|
||||
idx = body.find("Ingredients\n")
|
||||
title = body[:idx].strip().splitlines()[-1] if idx > 0 else ""
|
||||
|
||||
# ── Ingredients / Instructions via body text ───────────────────────────────
|
||||
ing_start = body.find("\nIngredients\n")
|
||||
inst_start = body.find("\nInstructions\n")
|
||||
footer_start = body.find("\nShop\n") # footer sentinel
|
||||
|
||||
if ing_start == -1:
|
||||
return None # page didn't render recipe content
|
||||
|
||||
raw_ingredients: list[str] = []
|
||||
raw_instructions: list[str] = []
|
||||
|
||||
if ing_start != -1 and inst_start != -1:
|
||||
ing_block = body[ing_start + len("\nIngredients\n"):inst_start].strip()
|
||||
raw_ingredients = [l.strip() for l in ing_block.splitlines() if l.strip()]
|
||||
|
||||
if inst_start != -1:
|
||||
end = footer_start if footer_start > inst_start else len(body)
|
||||
inst_block = body[inst_start + len("\nInstructions\n"):end].strip()
|
||||
# Steps start with a digit
|
||||
steps: list[str] = []
|
||||
current: list[str] = []
|
||||
for line in inst_block.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
if re.match(r"^\d+$", line):
|
||||
if current:
|
||||
steps.append(" ".join(current))
|
||||
current = []
|
||||
elif line.startswith("CULINARY NOTES"):
|
||||
break
|
||||
else:
|
||||
current.append(line)
|
||||
if current:
|
||||
steps.append(" ".join(current))
|
||||
raw_instructions = steps
|
||||
|
||||
# ── Nutrition ──────────────────────────────────────────────────────────────
|
||||
def _extract_num(pattern: str) -> float | None:
|
||||
m = re.search(pattern, body)
|
||||
try:
|
||||
return float(m.group(1)) if m else None
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
cal = _extract_num(r"(\d+)\s*CAL")
|
||||
fat = _extract_num(r"(\d+(?:\.\d+)?)g\s*FAT")
|
||||
carbs = _extract_num(r"(\d+(?:\.\d+)?)g\s*CARBS")
|
||||
prot = _extract_num(r"(\d+(?:\.\d+)?)g\s*PROTEIN")
|
||||
fiber = _extract_num(r"(\d+(?:\.\d+)?)g\s*FIBER")
|
||||
|
||||
# ── Allergens / tags ───────────────────────────────────────────────────────
|
||||
allergen_m = re.search(r"Allergens?:\s*([^\n]+)", body)
|
||||
allergens = allergen_m.group(1).strip() if allergen_m else ""
|
||||
|
||||
# Feature tags like HIGH-PROTEIN, QUICK, etc. appear before Ingredients
|
||||
pre_ing = body[:ing_start]
|
||||
tags = re.findall(r"\b(HIGH-PROTEIN|QUICK|SPICY|LOW[\-\s]CALORIE|VEGAN|FAMILY\s+FRIENDLY)\b", pre_ing)
|
||||
|
||||
return {
|
||||
"Slug": slug,
|
||||
"Name": title,
|
||||
"SourceURL": source_url,
|
||||
"Source": "purplecarrot_live",
|
||||
"RecipeIngredientParts": raw_ingredients,
|
||||
"RecipeInstructions": raw_instructions,
|
||||
"Calories": cal,
|
||||
"FatContent": fat,
|
||||
"CarbohydrateContent": carbs,
|
||||
"ProteinContent": prot,
|
||||
"FiberContent": fiber,
|
||||
"Allergens": allergens,
|
||||
"Keywords": tags,
|
||||
"HasFullRecipe": bool(raw_ingredients and raw_instructions),
|
||||
}
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||
parser.add_argument("--delay", type=float, default=2.5,
|
||||
help="Seconds between requests (be polite)")
|
||||
parser.add_argument("--limit", type=int, default=0,
|
||||
help="Stop after N slugs (0 = all)")
|
||||
parser.add_argument("--resume", action="store_true",
|
||||
help="Skip slugs already present in --out")
|
||||
parser.add_argument("--slugs-from", type=Path, default=None,
|
||||
help="Read slug inventory from this parquet instead of the default Wayback one")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load slug inventory — either from a custom parquet or the default Wayback run
|
||||
slugs_parquet = args.slugs_from if args.slugs_from else EXISTING_PARQUET
|
||||
df_existing = pd.read_parquet(slugs_parquet)
|
||||
slugs = df_existing["Slug"].dropna().unique().tolist()
|
||||
# source_urls may not be present in custom parcets — fall back to constructing from slug
|
||||
if "SourceURL" in df_existing.columns:
|
||||
source_urls = dict(zip(df_existing["Slug"], df_existing["SourceURL"]))
|
||||
else:
|
||||
source_urls = {s: BASE_URL.format(slug=s) for s in slugs}
|
||||
|
||||
# Resume support
|
||||
done_slugs: set[str] = set()
|
||||
if args.resume and args.out.exists():
|
||||
df_done = pd.read_parquet(args.out)
|
||||
done_slugs = set(df_done["Slug"].dropna().tolist())
|
||||
print(f"Resuming — {len(done_slugs)} slugs already scraped")
|
||||
|
||||
if args.limit:
|
||||
slugs = slugs[: args.limit]
|
||||
|
||||
results: list[dict[str, Any]] = []
|
||||
skipped = 0
|
||||
failed = 0
|
||||
|
||||
_UA = (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||
)
|
||||
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
|
||||
for i, slug in enumerate(slugs):
|
||||
if slug in done_slugs:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
url = BASE_URL.format(slug=slug)
|
||||
print(f"[{i+1}/{len(slugs)}] {slug} … ", end="", flush=True)
|
||||
|
||||
# Use a fresh browser context per slug to avoid Cloudflare session-level
|
||||
# bot detection, which fires on the 2nd+ request in the same context.
|
||||
context = browser.new_context(
|
||||
user_agent=_UA,
|
||||
viewport={"width": 1280, "height": 900},
|
||||
)
|
||||
page = context.new_page()
|
||||
|
||||
try:
|
||||
page.goto(url, timeout=NAV_TIMEOUT_MS, wait_until="domcontentloaded")
|
||||
page.wait_for_timeout(RENDER_WAIT_MS)
|
||||
recipe = _parse_recipe(page, slug, source_urls.get(slug, url))
|
||||
except PWTimeout:
|
||||
print("TIMEOUT")
|
||||
failed += 1
|
||||
except Exception as exc:
|
||||
print(f"ERROR: {exc}")
|
||||
failed += 1
|
||||
else:
|
||||
if recipe is None:
|
||||
print("no content (404 or redirect)")
|
||||
failed += 1
|
||||
elif recipe["HasFullRecipe"]:
|
||||
n = len(recipe["RecipeIngredientParts"])
|
||||
s = len(recipe["RecipeInstructions"])
|
||||
print(f"OK ({n} ingredients, {s} steps)")
|
||||
results.append(recipe)
|
||||
else:
|
||||
print(f"partial (ings={len(recipe['RecipeIngredientParts'])}, steps={len(recipe['RecipeInstructions'])})")
|
||||
results.append(recipe)
|
||||
finally:
|
||||
context.close()
|
||||
|
||||
time.sleep(args.delay)
|
||||
|
||||
browser.close()
|
||||
|
||||
print(f"\nDone — {len(results)} scraped, {skipped} skipped, {failed} failed")
|
||||
|
||||
if results:
|
||||
df_out = pd.DataFrame(results)
|
||||
# Merge with existing metadata (nutrition stubs, wayback fields) for slugs
|
||||
# that didn't previously have full data
|
||||
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||
if args.resume and args.out.exists():
|
||||
df_prev = pd.read_parquet(args.out)
|
||||
df_out = pd.concat([df_prev, df_out], ignore_index=True)
|
||||
df_out = df_out.drop_duplicates(subset=["Slug"], keep="last")
|
||||
df_out.to_parquet(args.out, index=False)
|
||||
full_count = df_out["HasFullRecipe"].sum() if "HasFullRecipe" in df_out.columns else "?"
|
||||
print(f"Saved {len(df_out)} rows to {args.out} ({full_count} with full recipes)")
|
||||
else:
|
||||
print("No results — output not written")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -37,12 +37,12 @@ import requests
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
|
||||
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
|
||||
WB_BASE = "https://web.archive.org/web"
|
||||
PC_HOST = "www.purplecarrot.com"
|
||||
|
||||
REPLAY_DELAY = 1.2
|
||||
CDX_DELAY = 0.5
|
||||
REPLAY_DELAY = 2.0
|
||||
CDX_DELAY = 3.0 # archive.org CDX rate-limits aggressively; be polite
|
||||
|
||||
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||
|
|
@ -54,29 +54,41 @@ _REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n',
|
|||
|
||||
# ── Wayback helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
def _cdx_get(params: dict) -> list:
|
||||
"""CDX request with retry on 429/503 (archive.org rate-limits aggressively)."""
|
||||
for attempt in range(4):
|
||||
try:
|
||||
resp = requests.get(CDX_BASE, params=params, timeout=25)
|
||||
if resp.status_code in (429, 503):
|
||||
wait = 15 * (2 ** attempt)
|
||||
logger.debug("CDX %s — backing off %ds", resp.status_code, wait)
|
||||
time.sleep(wait)
|
||||
continue
|
||||
resp.raise_for_status()
|
||||
rows = resp.json()
|
||||
return rows if rows else []
|
||||
except Exception as exc:
|
||||
logger.debug("CDX attempt %d failed: %s", attempt + 1, exc)
|
||||
time.sleep(5 * (attempt + 1))
|
||||
return []
|
||||
|
||||
|
||||
def _cdx_timestamps(slug: str) -> list[str]:
|
||||
"""Return all captured timestamps for a product slug, oldest first."""
|
||||
url = f"{PC_HOST}/api/v1/products/{slug}"
|
||||
try:
|
||||
resp = requests.get(
|
||||
CDX_BASE,
|
||||
params={
|
||||
"url": url,
|
||||
"output": "json",
|
||||
"fl": "timestamp,statuscode",
|
||||
"filter": "statuscode:200",
|
||||
"limit": "20",
|
||||
},
|
||||
timeout=20,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
rows = resp.json()
|
||||
if len(rows) < 2:
|
||||
return []
|
||||
return [row[0] for row in rows[1:]] # timestamps only, oldest first
|
||||
except Exception as exc:
|
||||
logger.debug("CDX timestamps failed for %s: %s", slug, exc)
|
||||
"""Return captured timestamps for a product slug, oldest first (pre-2022 window)."""
|
||||
rows = _cdx_get({
|
||||
"url": f"{PC_HOST}/api/v1/products/{slug}",
|
||||
"output": "json",
|
||||
"fl": "timestamp,statuscode",
|
||||
"filter": "statuscode:200",
|
||||
"limit": "20",
|
||||
# Pre-HelloFresh-acquisition captures (2019-2021) are most likely
|
||||
# to have full instructions — API stripped them post-acquisition.
|
||||
"from": "20190101",
|
||||
"to": "20211231",
|
||||
})
|
||||
if len(rows) < 2:
|
||||
return []
|
||||
return [row[0] for row in rows[1:]] # timestamps only, oldest first
|
||||
|
||||
|
||||
def _wayback_json(url: str, timestamp: str) -> Any | None:
|
||||
|
|
@ -172,6 +184,9 @@ def _extract_from_api(data: dict) -> dict | None:
|
|||
description = sku.get("description") or ""
|
||||
|
||||
images = sku.get("hero_images") or sku.get("image_versions") or []
|
||||
# hero_images can be a list OR a dict keyed by size string — normalise to list
|
||||
if isinstance(images, dict):
|
||||
images = list(images.values())
|
||||
image_url = ""
|
||||
if images and isinstance(images[0], dict):
|
||||
image_url = images[0].get("image_url") or images[0].get("url") or ""
|
||||
|
|
@ -319,23 +334,14 @@ def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None:
|
|||
|
||||
# HTML fallback when API has no steps/ingredients
|
||||
if not recipe or not recipe.get("has_full_recipe"):
|
||||
html_cdx_url = f"{PC_HOST}/recipe/{slug}"
|
||||
try:
|
||||
html_resp = requests.get(
|
||||
CDX_BASE,
|
||||
params={
|
||||
"url": html_cdx_url,
|
||||
"output": "json",
|
||||
"fl": "timestamp,statuscode",
|
||||
"filter": "statuscode:200",
|
||||
"limit": "5",
|
||||
},
|
||||
timeout=20,
|
||||
)
|
||||
html_ts_rows = html_resp.json() if html_resp.ok else []
|
||||
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
|
||||
except Exception:
|
||||
html_timestamps = []
|
||||
html_ts_rows = _cdx_get({
|
||||
"url": f"{PC_HOST}/recipe/{slug}",
|
||||
"output": "json",
|
||||
"fl": "timestamp,statuscode",
|
||||
"filter": "statuscode:200",
|
||||
"limit": "10",
|
||||
})
|
||||
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
|
||||
time.sleep(CDX_DELAY)
|
||||
|
||||
for ts in html_timestamps:
|
||||
|
|
@ -522,6 +528,9 @@ def main() -> None:
|
|||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||
attach_pipeline_log("scrape_recipes")
|
||||
|
||||
scrape(args.slugs, args.out, resume=args.resume)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue