Compare commits
6 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cf807179f5 | |||
| 0c200f3148 | |||
| 21a0664961 | |||
| a9ab996bcc | |||
| 56f942b3fd | |||
| 84636bcdaf |
9 changed files with 972 additions and 43 deletions
|
|
@ -8,7 +8,7 @@
|
||||||
|
|
||||||
[](#license)
|
[](#license)
|
||||||
[](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/actions)
|
[](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/actions)
|
||||||
[](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/releases)
|
[](https://git.opensourcesolarpunk.com/Circuit-Forge/kiwi/releases)
|
||||||
|
|
||||||
[Documentation](https://docs.circuitforge.tech/kiwi) · [Live demo](https://menagerie.circuitforge.tech/kiwi) · [circuitforge.tech](https://circuitforge.tech)
|
[Documentation](https://docs.circuitforge.tech/kiwi) · [Live demo](https://menagerie.circuitforge.tech/kiwi) · [circuitforge.tech](https://circuitforge.tech)
|
||||||
|
|
||||||
|
|
@ -113,4 +113,6 @@ Kiwi uses a split license:
|
||||||
- **Discovery and inventory pipeline** (barcode scan, expiry tracking, pantry CRUD, CSV export, recipe browser): [MIT](LICENSE-MIT)
|
- **Discovery and inventory pipeline** (barcode scan, expiry tracking, pantry CRUD, CSV export, recipe browser): [MIT](LICENSE-MIT)
|
||||||
- **AI features** (receipt OCR, LLM recipe suggestions, style auto-classifier): [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use or SaaS re-hosting requires a paid license. Converts to MIT after 4 years.
|
- **AI features** (receipt OCR, LLM recipe suggestions, style auto-classifier): [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use or SaaS re-hosting requires a paid license. Converts to MIT after 4 years.
|
||||||
|
|
||||||
|
Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)
|
||||||
|
|
||||||
Privacy · Safety · Accessibility — co-equal, non-negotiable across all CircuitForge products.
|
Privacy · Safety · Accessibility — co-equal, non-negotiable across all CircuitForge products.
|
||||||
|
|
|
||||||
218
scripts/pipeline/ingest_purplecarrot.py
Normal file
218
scripts/pipeline/ingest_purplecarrot.py
Normal file
|
|
@ -0,0 +1,218 @@
|
||||||
|
"""Ingest Purple Carrot scraped recipes into the Kiwi corpus database.
|
||||||
|
|
||||||
|
Reads recipes_purplecarrot_live.parquet (output of scrape_live.py) and
|
||||||
|
upserts into the shared recipes table, setting source='purplecarrot' and
|
||||||
|
using the recipe slug as the external_id (prefixed pc_).
|
||||||
|
|
||||||
|
Run after each weekly_harvest.sh scrape:
|
||||||
|
|
||||||
|
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
|
||||||
|
[--db /Library/Assets/kiwi/kiwi.db] \
|
||||||
|
[--parquet /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet]
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import math
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
# ── Helpers (inlined from build_recipe_index to avoid cross-module import) ─────
|
||||||
|
|
||||||
|
_MEASURE_PATTERN = re.compile(
|
||||||
|
r"^\d[\d\s/¼½¾⅓⅔]*\s*(cup|tbsp|tsp|oz|lb|g|kg|ml|l|clove|slice|piece|can|pkg|package|bunch|head|stalk|sprig|pinch|dash|to taste|as needed)s?\b",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
_LEAD_NUMBER = re.compile(r"^\d[\d\s/¼½¾⅓⅔]*\s*")
|
||||||
|
_TRAILING_QUALIFIER = re.compile(
|
||||||
|
r"\s*(to taste|as needed|or more|or less|optional|if desired|if needed)\s*$",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _float_or_none(val: object) -> float | None:
|
||||||
|
try:
|
||||||
|
v = float(val) # type: ignore[arg-type]
|
||||||
|
return v if v > 0 else None
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_list(val: object) -> list:
|
||||||
|
if val is None:
|
||||||
|
return []
|
||||||
|
if isinstance(val, float) and math.isnan(val):
|
||||||
|
return []
|
||||||
|
if isinstance(val, list):
|
||||||
|
return val
|
||||||
|
# Parquet often deserializes list columns as numpy arrays
|
||||||
|
try:
|
||||||
|
import numpy as np
|
||||||
|
if isinstance(val, np.ndarray):
|
||||||
|
return val.tolist()
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_ingredient_names(raw_list: list[str]) -> list[str]:
|
||||||
|
names = []
|
||||||
|
for raw in raw_list:
|
||||||
|
s = raw.lower().strip()
|
||||||
|
s = _MEASURE_PATTERN.sub("", s)
|
||||||
|
s = _LEAD_NUMBER.sub("", s)
|
||||||
|
s = re.sub(r"\(.*?\)", "", s)
|
||||||
|
s = re.sub(r",.*$", "", s)
|
||||||
|
s = _TRAILING_QUALIFIER.sub("", s)
|
||||||
|
s = s.strip(" -.,")
|
||||||
|
if s and len(s) > 1:
|
||||||
|
names.append(s)
|
||||||
|
return names
|
||||||
|
|
||||||
|
|
||||||
|
def _compute_element_coverage(profiles: list[dict]) -> dict[str, float]:
|
||||||
|
counts: dict[str, int] = {}
|
||||||
|
for p in profiles:
|
||||||
|
for elem in p.get("elements", []):
|
||||||
|
counts[elem] = counts.get(elem, 0) + 1
|
||||||
|
if not profiles:
|
||||||
|
return {}
|
||||||
|
return {e: round(c / len(profiles), 3) for e, c in counts.items()}
|
||||||
|
|
||||||
|
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
DEFAULT_DB = Path("/Library/Assets/kiwi/kiwi.db")
|
||||||
|
DEFAULT_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Ingest ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def ingest(db_path: Path, parquet_path: Path) -> None:
|
||||||
|
df = pd.read_parquet(parquet_path)
|
||||||
|
|
||||||
|
# Filter to rows with full recipe data
|
||||||
|
if "HasFullRecipe" in df.columns:
|
||||||
|
df = df[df["HasFullRecipe"] == True].copy()
|
||||||
|
|
||||||
|
if df.empty:
|
||||||
|
print("No full recipes found in parquet — nothing to ingest.")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Ingesting {len(df)} Purple Carrot recipes into {db_path} …")
|
||||||
|
|
||||||
|
conn = sqlite3.connect(db_path)
|
||||||
|
try:
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
|
|
||||||
|
# Pre-load ingredient element profiles for coverage calculation
|
||||||
|
profile_index: dict[str, list[str]] = {}
|
||||||
|
for row in conn.execute("SELECT name, elements FROM ingredient_profiles"):
|
||||||
|
try:
|
||||||
|
profile_index[row[0]] = json.loads(row[1])
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
inserted = updated = 0
|
||||||
|
|
||||||
|
for _, row in df.iterrows():
|
||||||
|
slug = str(row.get("Slug", "")).strip()
|
||||||
|
if not slug:
|
||||||
|
continue
|
||||||
|
|
||||||
|
external_id = f"pc_{slug}"
|
||||||
|
title = str(row.get("Name", "")).strip()[:500]
|
||||||
|
if not title:
|
||||||
|
continue
|
||||||
|
|
||||||
|
raw_ingredients = [str(i) for i in _safe_list(row.get("RecipeIngredientParts", []))]
|
||||||
|
directions = [str(d) for d in _safe_list(row.get("RecipeInstructions", []))]
|
||||||
|
|
||||||
|
ingredient_names = _extract_ingredient_names(raw_ingredients)
|
||||||
|
profiles = [
|
||||||
|
{"elements": profile_index[n]}
|
||||||
|
for n in ingredient_names if n in profile_index
|
||||||
|
]
|
||||||
|
coverage = _compute_element_coverage(profiles)
|
||||||
|
|
||||||
|
# Keywords: merge scraped tags with allergen info
|
||||||
|
kw_raw = _safe_list(row.get("Keywords", []))
|
||||||
|
allergens = str(row.get("Allergens", "") or "")
|
||||||
|
if allergens:
|
||||||
|
kw_raw = list(kw_raw) + [f"allergen:{a.strip()}" for a in allergens.split(",") if a.strip()]
|
||||||
|
keywords_json = json.dumps(kw_raw)
|
||||||
|
|
||||||
|
# Check if already present (same external_id)
|
||||||
|
existing = conn.execute(
|
||||||
|
"SELECT id FROM recipes WHERE external_id = ?", (external_id,)
|
||||||
|
).fetchone()
|
||||||
|
|
||||||
|
params = (
|
||||||
|
title,
|
||||||
|
json.dumps(raw_ingredients),
|
||||||
|
json.dumps(ingredient_names),
|
||||||
|
json.dumps(directions),
|
||||||
|
"meal-kit", # category
|
||||||
|
keywords_json,
|
||||||
|
_float_or_none(row.get("Calories")),
|
||||||
|
_float_or_none(row.get("FatContent")),
|
||||||
|
_float_or_none(row.get("ProteinContent")),
|
||||||
|
None, # sodium_mg — not scraped
|
||||||
|
json.dumps(coverage),
|
||||||
|
None, # sugar_g — not scraped
|
||||||
|
_float_or_none(row.get("CarbohydrateContent")),
|
||||||
|
_float_or_none(row.get("FiberContent")),
|
||||||
|
2.0, # servings — PC meal kits are 2-serving by default
|
||||||
|
0, # nutrition_estimated — PC provides real data
|
||||||
|
)
|
||||||
|
|
||||||
|
if existing:
|
||||||
|
conn.execute("""
|
||||||
|
UPDATE recipes
|
||||||
|
SET title=?, ingredients=?, ingredient_names=?, directions=?,
|
||||||
|
category=?, keywords=?, calories=?, fat_g=?, protein_g=?,
|
||||||
|
sodium_mg=?, element_coverage=?,
|
||||||
|
sugar_g=?, carbs_g=?, fiber_g=?, servings=?, nutrition_estimated=?
|
||||||
|
WHERE external_id=?
|
||||||
|
""", params + (external_id,))
|
||||||
|
updated += 1
|
||||||
|
else:
|
||||||
|
conn.execute("""
|
||||||
|
INSERT INTO recipes
|
||||||
|
(external_id, source, title, ingredients, ingredient_names,
|
||||||
|
directions, category, keywords, calories, fat_g, protein_g,
|
||||||
|
sodium_mg, element_coverage,
|
||||||
|
sugar_g, carbs_g, fiber_g, servings, nutrition_estimated)
|
||||||
|
VALUES (?, 'purplecarrot', ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
||||||
|
""", (external_id,) + params)
|
||||||
|
inserted += 1
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
print(f"Done — {inserted} inserted, {updated} updated")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
|
||||||
|
parser.add_argument("--parquet", type=Path, default=DEFAULT_PARQUET)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not args.parquet.exists():
|
||||||
|
print(f"ERROR: parquet not found at {args.parquet}")
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
ingest(args.db, args.parquet)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
68
scripts/pipeline/log_utils.py
Normal file
68
scripts/pipeline/log_utils.py
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
"""
|
||||||
|
Pipeline logging utility.
|
||||||
|
|
||||||
|
Adds a structured JSON FileHandler to the root logger so every pipeline
|
||||||
|
script automatically writes machine-readable logs to the shared datastore
|
||||||
|
at /Library/Assets/logs/pipeline/. Avocet ingests these for Turnstone
|
||||||
|
logreading training (kiwi#141 / avocet#67).
|
||||||
|
|
||||||
|
Usage (add near the top of main() after logging.basicConfig):
|
||||||
|
|
||||||
|
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||||
|
attach_pipeline_log("scrape_recipes")
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
PIPELINE_LOG_DIR = Path(
|
||||||
|
os.environ.get("PIPELINE_LOG_DIR", "/Library/Assets/logs/pipeline")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _JsonFormatter(logging.Formatter):
|
||||||
|
def format(self, record: logging.LogRecord) -> str:
|
||||||
|
payload: dict = {
|
||||||
|
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
|
||||||
|
"level": record.levelname,
|
||||||
|
"logger": record.name,
|
||||||
|
"msg": record.getMessage(),
|
||||||
|
}
|
||||||
|
if record.exc_info:
|
||||||
|
payload["exc"] = self.formatException(record.exc_info)
|
||||||
|
# Any extra kwargs passed via logger.info("...", extra={...})
|
||||||
|
standard = {
|
||||||
|
"name", "msg", "args", "levelname", "levelno", "pathname",
|
||||||
|
"filename", "module", "exc_info", "exc_text", "stack_info",
|
||||||
|
"lineno", "funcName", "created", "msecs", "relativeCreated",
|
||||||
|
"thread", "threadName", "processName", "process", "message",
|
||||||
|
"taskName",
|
||||||
|
}
|
||||||
|
extra = {k: v for k, v in record.__dict__.items() if k not in standard}
|
||||||
|
if extra:
|
||||||
|
payload["extra"] = extra
|
||||||
|
return json.dumps(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def attach_pipeline_log(script_name: str) -> Path:
|
||||||
|
"""Attach a JSON file handler to the root logger for pipeline logging.
|
||||||
|
|
||||||
|
Returns the path of the log file created.
|
||||||
|
"""
|
||||||
|
PIPELINE_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%S")
|
||||||
|
log_path = PIPELINE_LOG_DIR / f"{script_name}_{ts}.jsonl"
|
||||||
|
|
||||||
|
handler = logging.FileHandler(log_path, encoding="utf-8")
|
||||||
|
handler.setLevel(logging.DEBUG)
|
||||||
|
handler.setFormatter(_JsonFormatter())
|
||||||
|
logging.getLogger().addHandler(handler)
|
||||||
|
|
||||||
|
logging.getLogger(__name__).info(
|
||||||
|
"Pipeline log: %s", log_path, extra={"script": script_name}
|
||||||
|
)
|
||||||
|
return log_path
|
||||||
120
scripts/pipeline/purple_carrot/discover_current_menu.py
Normal file
120
scripts/pipeline/purple_carrot/discover_current_menu.py
Normal file
|
|
@ -0,0 +1,120 @@
|
||||||
|
"""Discover Purple Carrot's current weekly menu recipe slugs.
|
||||||
|
|
||||||
|
The main /plant-based-recipes listing page always renders the current week's
|
||||||
|
menu as server-side HTML. This script pulls those slugs and writes them to a
|
||||||
|
parquet that can be passed directly to scrape_live.py via --slugs-from.
|
||||||
|
|
||||||
|
Run weekly (e.g. via cron) to accumulate new recipes as the menu rotates.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_current_menu.py \
|
||||||
|
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet]
|
||||||
|
|
||||||
|
Then scrape:
|
||||||
|
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
|
||||||
|
--slugs-from /Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet \
|
||||||
|
--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \
|
||||||
|
--resume
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from datetime import date
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import requests
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
|
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
LISTING_URL = "https://www.purplecarrot.com/plant-based-recipes"
|
||||||
|
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
|
||||||
|
|
||||||
|
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet")
|
||||||
|
|
||||||
|
HEADERS = {
|
||||||
|
"User-Agent": (
|
||||||
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||||
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||||
|
),
|
||||||
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||||
|
"Accept-Language": "en-US,en;q=0.5",
|
||||||
|
}
|
||||||
|
|
||||||
|
RECIPE_HREF_RE = re.compile(r"/recipe/([^?#]+)")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def discover_current_slugs() -> list[str]:
|
||||||
|
"""Fetch the listing page and return unique recipe slugs from the current menu."""
|
||||||
|
resp = requests.get(LISTING_URL, headers=HEADERS, timeout=15)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
print(f"ERROR: listing page returned HTTP {resp.status_code}", file=sys.stderr)
|
||||||
|
return []
|
||||||
|
|
||||||
|
soup = BeautifulSoup(resp.text, "html.parser")
|
||||||
|
slugs: list[str] = []
|
||||||
|
seen: set[str] = set()
|
||||||
|
for a in soup.find_all("a", href=RECIPE_HREF_RE):
|
||||||
|
m = RECIPE_HREF_RE.search(a["href"])
|
||||||
|
if m:
|
||||||
|
slug = m.group(1)
|
||||||
|
if slug not in seen:
|
||||||
|
seen.add(slug)
|
||||||
|
slugs.append(slug)
|
||||||
|
return slugs
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
import argparse
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
print(f"Fetching current menu from {LISTING_URL} …")
|
||||||
|
slugs = discover_current_slugs()
|
||||||
|
|
||||||
|
if not slugs:
|
||||||
|
print("No slugs found — the listing page may have changed structure or blocked the request.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
today = date.today().isoformat()
|
||||||
|
records = [
|
||||||
|
{
|
||||||
|
"Slug": slug,
|
||||||
|
"SourceURL": BASE_URL.format(slug=slug),
|
||||||
|
"Source": "purplecarrot_menu",
|
||||||
|
"DiscoveredDate": today,
|
||||||
|
}
|
||||||
|
for slug in slugs
|
||||||
|
]
|
||||||
|
|
||||||
|
# Merge with any existing menu parquet (accumulate weeks)
|
||||||
|
df_new = pd.DataFrame(records)
|
||||||
|
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
if args.out.exists():
|
||||||
|
df_prev = pd.read_parquet(args.out)
|
||||||
|
combined = pd.concat([df_prev, df_new], ignore_index=True)
|
||||||
|
combined = combined.drop_duplicates(subset=["Slug"], keep="first")
|
||||||
|
df_new = combined
|
||||||
|
|
||||||
|
df_new.to_parquet(args.out, index=False)
|
||||||
|
|
||||||
|
print(f"Found {len(slugs)} current-menu slugs this week:")
|
||||||
|
for s in slugs:
|
||||||
|
print(f" {s}")
|
||||||
|
print(f"\nSaved {len(df_new)} total slugs (accumulated) to {args.out}")
|
||||||
|
print(f"\nTo scrape full recipes:")
|
||||||
|
print(f" conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \\")
|
||||||
|
print(f" --slugs-from {args.out} \\")
|
||||||
|
print(f" --out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet \\")
|
||||||
|
print(f" --resume")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
218
scripts/pipeline/purple_carrot/discover_slugs_categories.py
Normal file
218
scripts/pipeline/purple_carrot/discover_slugs_categories.py
Normal file
|
|
@ -0,0 +1,218 @@
|
||||||
|
"""Discover Purple Carrot recipe slugs by crawling all recipe-category listing pages.
|
||||||
|
|
||||||
|
The site serves full server-rendered HTML for category pages, paginated via
|
||||||
|
?page=N. Each page loads 18 recipe cards. This script crawls every category
|
||||||
|
across all pages and writes a deduplicated slug inventory.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_slugs_categories.py \
|
||||||
|
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet] \
|
||||||
|
[--delay 2.0] \
|
||||||
|
[--max-pages 50] # safety cap per category (comfort-foods has ~18)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import requests
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
|
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
BASE = "https://www.purplecarrot.com"
|
||||||
|
|
||||||
|
# All known category slugs (from /plant-based-recipes nav)
|
||||||
|
CATEGORIES: list[str] = [
|
||||||
|
"comfort-foods",
|
||||||
|
"family-friendly",
|
||||||
|
"healthy-desserts",
|
||||||
|
"holiday-recipes",
|
||||||
|
"quick-and-easy",
|
||||||
|
"party-foods",
|
||||||
|
"seasonal-menu",
|
||||||
|
"spring-recipes",
|
||||||
|
"summer-recipes",
|
||||||
|
"fall-recipes",
|
||||||
|
"winter-recipes",
|
||||||
|
"african",
|
||||||
|
"american",
|
||||||
|
"asian",
|
||||||
|
"comfort",
|
||||||
|
"french",
|
||||||
|
"indian",
|
||||||
|
"italian",
|
||||||
|
"mediterranean",
|
||||||
|
"mexican",
|
||||||
|
"middle-eastern",
|
||||||
|
"soups",
|
||||||
|
"salads",
|
||||||
|
"bowls",
|
||||||
|
"pasta",
|
||||||
|
"sandwiches-wraps",
|
||||||
|
"tacos",
|
||||||
|
"breakfast",
|
||||||
|
"snacks-sides",
|
||||||
|
]
|
||||||
|
|
||||||
|
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_slugs.parquet")
|
||||||
|
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||||
|
|
||||||
|
RECIPE_LINK_SELECTOR = "a.c-recipe__title"
|
||||||
|
SLUG_RE = re.compile(r"/recipe/([^?#]+)")
|
||||||
|
|
||||||
|
HEADERS = {
|
||||||
|
"User-Agent": (
|
||||||
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||||
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||||
|
),
|
||||||
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||||
|
"Accept-Language": "en-US,en;q=0.5",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _fetch_html(url: str, session: requests.Session) -> str | None:
|
||||||
|
"""Fetch URL and return HTML string, or None on failure."""
|
||||||
|
try:
|
||||||
|
resp = session.get(url, headers=HEADERS, timeout=15)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
return resp.text
|
||||||
|
if resp.status_code == 404:
|
||||||
|
return None # expected end of pagination
|
||||||
|
print(f" HTTP {resp.status_code} — {url}")
|
||||||
|
return None
|
||||||
|
except Exception as exc:
|
||||||
|
print(f" ERROR fetching {url}: {exc}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_slugs(html: str) -> list[str]:
|
||||||
|
"""Pull recipe slugs from one listing-page HTML response."""
|
||||||
|
soup = BeautifulSoup(html, "html.parser")
|
||||||
|
slugs: list[str] = []
|
||||||
|
for a in soup.select(RECIPE_LINK_SELECTOR):
|
||||||
|
href = a.get("href", "")
|
||||||
|
m = SLUG_RE.search(href)
|
||||||
|
if m:
|
||||||
|
slugs.append(m.group(1))
|
||||||
|
return slugs
|
||||||
|
|
||||||
|
|
||||||
|
def _get_category_total(html: str) -> int | None:
|
||||||
|
"""Try to parse the recipe count shown on the category page (e.g. '319 Recipes')."""
|
||||||
|
m = re.search(r"(\d+)\s+Recipes?\b", html)
|
||||||
|
return int(m.group(1)) if m else None
|
||||||
|
|
||||||
|
|
||||||
|
def _discover_category(
|
||||||
|
category: str,
|
||||||
|
session: requests.Session,
|
||||||
|
delay: float,
|
||||||
|
max_pages: int,
|
||||||
|
) -> tuple[list[str], int]:
|
||||||
|
"""Crawl all pages of a category, return (slugs, pages_fetched)."""
|
||||||
|
slugs: list[str] = []
|
||||||
|
for page_num in range(1, max_pages + 1):
|
||||||
|
if page_num == 1:
|
||||||
|
url = f"{BASE}/recipe-categories/{category}"
|
||||||
|
else:
|
||||||
|
url = f"{BASE}/recipe-categories/{category}?page={page_num}"
|
||||||
|
|
||||||
|
html = _fetch_html(url, session)
|
||||||
|
if html is None:
|
||||||
|
break # 404 or error = past the end
|
||||||
|
|
||||||
|
page_slugs = _extract_slugs(html)
|
||||||
|
if not page_slugs:
|
||||||
|
# Show total if we got a page but no links (category slug may be wrong)
|
||||||
|
if page_num == 1:
|
||||||
|
total = _get_category_total(html)
|
||||||
|
if total is not None:
|
||||||
|
print(f" page 1 loaded (total={total}) but 0 recipe links — selector may need updating")
|
||||||
|
break
|
||||||
|
|
||||||
|
slugs.extend(page_slugs)
|
||||||
|
|
||||||
|
# Print progress
|
||||||
|
total_hint = _get_category_total(html) if page_num == 1 else None
|
||||||
|
total_str = f" / {total_hint}" if total_hint else ""
|
||||||
|
print(f" page {page_num}: +{len(page_slugs)} slugs ({len(slugs)}{total_str} cumulative)")
|
||||||
|
|
||||||
|
if len(page_slugs) < 18:
|
||||||
|
# Short page = last page
|
||||||
|
break
|
||||||
|
|
||||||
|
time.sleep(delay)
|
||||||
|
|
||||||
|
return slugs, (len(slugs) + 17) // 18 # approximate pages
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||||
|
parser.add_argument("--delay", type=float, default=2.0,
|
||||||
|
help="Seconds between page requests")
|
||||||
|
parser.add_argument("--max-pages", type=int, default=50,
|
||||||
|
help="Safety cap on pages per category")
|
||||||
|
parser.add_argument("--categories", nargs="*",
|
||||||
|
help="Crawl only these category slugs (default: all)")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
categories = args.categories or CATEGORIES
|
||||||
|
|
||||||
|
# Seed with any slugs from the Wayback parquet
|
||||||
|
known_slugs: set[str] = set()
|
||||||
|
if EXISTING_PARQUET.exists():
|
||||||
|
df_wb = pd.read_parquet(EXISTING_PARQUET)
|
||||||
|
known_slugs = set(df_wb["Slug"].dropna().tolist())
|
||||||
|
print(f"Seeded with {len(known_slugs)} slugs from Wayback parquet")
|
||||||
|
|
||||||
|
all_records: list[dict[str, Any]] = []
|
||||||
|
session = requests.Session()
|
||||||
|
|
||||||
|
for category in categories:
|
||||||
|
print(f"\n[{category}]")
|
||||||
|
cat_slugs, pages = _discover_category(category, session, args.delay, args.max_pages)
|
||||||
|
for slug in cat_slugs:
|
||||||
|
all_records.append({"Slug": slug, "Category": category, "Source": "purplecarrot_category"})
|
||||||
|
print(f" → {len(cat_slugs)} slugs across ~{pages} pages")
|
||||||
|
time.sleep(args.delay)
|
||||||
|
|
||||||
|
if not all_records:
|
||||||
|
print("\nNo records found — check that categories are correct and the site is accessible")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Deduplicate keeping first category encountered
|
||||||
|
df_new = pd.DataFrame(all_records)
|
||||||
|
df_new = df_new.drop_duplicates(subset=["Slug"], keep="first")
|
||||||
|
|
||||||
|
# Also include Wayback slugs not already in the new set
|
||||||
|
if known_slugs:
|
||||||
|
wb_only = known_slugs - set(df_new["Slug"].tolist())
|
||||||
|
if wb_only:
|
||||||
|
df_wb_extra = pd.DataFrame([
|
||||||
|
{"Slug": s, "Category": "wayback", "Source": "purplecarrot_wayback"}
|
||||||
|
for s in wb_only
|
||||||
|
])
|
||||||
|
df_new = pd.concat([df_new, df_wb_extra], ignore_index=True)
|
||||||
|
|
||||||
|
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
df_new.to_parquet(args.out, index=False)
|
||||||
|
|
||||||
|
new_count = len(df_new)
|
||||||
|
cat_count = len(df_new[df_new["Source"] == "purplecarrot_category"])
|
||||||
|
print(f"\nDone — {new_count} total slugs saved to {args.out}")
|
||||||
|
print(f" {cat_count} from category pages, {new_count - cat_count} from Wayback only")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -31,7 +31,7 @@ import requests
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
|
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
|
||||||
WB_BASE = "https://web.archive.org/web"
|
WB_BASE = "https://web.archive.org/web"
|
||||||
PC_HOST = "www.purplecarrot.com"
|
PC_HOST = "www.purplecarrot.com"
|
||||||
|
|
||||||
|
|
@ -291,6 +291,9 @@ def main() -> None:
|
||||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||||
|
attach_pipeline_log("discover_wayback")
|
||||||
|
|
||||||
discover(args.out)
|
discover(args.out)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
250
scripts/pipeline/purple_carrot/scrape_live.py
Normal file
250
scripts/pipeline/purple_carrot/scrape_live.py
Normal file
|
|
@ -0,0 +1,250 @@
|
||||||
|
"""Playwright scraper for live purplecarrot.com recipe pages.
|
||||||
|
|
||||||
|
Uses the slug inventory already in recipes_purplecarrot.parquet and fills in
|
||||||
|
the missing ingredients/instructions by hitting the live site directly.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
|
||||||
|
[--out /Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet] \
|
||||||
|
[--delay 2.5] \
|
||||||
|
[--limit 20]
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from playwright.sync_api import sync_playwright, Page, TimeoutError as PWTimeout
|
||||||
|
|
||||||
|
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
BASE_URL = "https://www.purplecarrot.com/recipe/{slug}"
|
||||||
|
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet")
|
||||||
|
EXISTING_PARQUET = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||||
|
|
||||||
|
RENDER_WAIT_MS = 2500 # JS render settle time
|
||||||
|
NAV_TIMEOUT_MS = 20_000
|
||||||
|
|
||||||
|
|
||||||
|
# ── Page parser ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _text(page: Page, selector: str) -> str:
|
||||||
|
el = page.query_selector(selector)
|
||||||
|
return el.inner_text().strip() if el else ""
|
||||||
|
|
||||||
|
|
||||||
|
def _texts(page: Page, selector: str) -> list[str]:
|
||||||
|
return [el.inner_text().strip() for el in page.query_selector_all(selector)]
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_recipe(page: Page, slug: str, source_url: str) -> dict[str, Any] | None:
|
||||||
|
"""Extract structured recipe data from the rendered page."""
|
||||||
|
body = page.inner_text("body")
|
||||||
|
|
||||||
|
# Abort if we've been bounced to a generic listing / 404
|
||||||
|
if "Page Not Found" in body or slug not in page.url:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# ── Title ──────────────────────────────────────────────────────────────────
|
||||||
|
# The <h1> on product pages tends to be the recipe name
|
||||||
|
title = (_text(page, "h1") or _text(page, "[class*='recipe-title']")).strip()
|
||||||
|
if not title:
|
||||||
|
# Fallback: first heading-like text before "Ingredients"
|
||||||
|
idx = body.find("Ingredients\n")
|
||||||
|
title = body[:idx].strip().splitlines()[-1] if idx > 0 else ""
|
||||||
|
|
||||||
|
# ── Ingredients / Instructions via body text ───────────────────────────────
|
||||||
|
ing_start = body.find("\nIngredients\n")
|
||||||
|
inst_start = body.find("\nInstructions\n")
|
||||||
|
footer_start = body.find("\nShop\n") # footer sentinel
|
||||||
|
|
||||||
|
if ing_start == -1:
|
||||||
|
return None # page didn't render recipe content
|
||||||
|
|
||||||
|
raw_ingredients: list[str] = []
|
||||||
|
raw_instructions: list[str] = []
|
||||||
|
|
||||||
|
if ing_start != -1 and inst_start != -1:
|
||||||
|
ing_block = body[ing_start + len("\nIngredients\n"):inst_start].strip()
|
||||||
|
raw_ingredients = [l.strip() for l in ing_block.splitlines() if l.strip()]
|
||||||
|
|
||||||
|
if inst_start != -1:
|
||||||
|
end = footer_start if footer_start > inst_start else len(body)
|
||||||
|
inst_block = body[inst_start + len("\nInstructions\n"):end].strip()
|
||||||
|
# Steps start with a digit
|
||||||
|
steps: list[str] = []
|
||||||
|
current: list[str] = []
|
||||||
|
for line in inst_block.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
if re.match(r"^\d+$", line):
|
||||||
|
if current:
|
||||||
|
steps.append(" ".join(current))
|
||||||
|
current = []
|
||||||
|
elif line.startswith("CULINARY NOTES"):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
current.append(line)
|
||||||
|
if current:
|
||||||
|
steps.append(" ".join(current))
|
||||||
|
raw_instructions = steps
|
||||||
|
|
||||||
|
# ── Nutrition ──────────────────────────────────────────────────────────────
|
||||||
|
def _extract_num(pattern: str) -> float | None:
|
||||||
|
m = re.search(pattern, body)
|
||||||
|
try:
|
||||||
|
return float(m.group(1)) if m else None
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
cal = _extract_num(r"(\d+)\s*CAL")
|
||||||
|
fat = _extract_num(r"(\d+(?:\.\d+)?)g\s*FAT")
|
||||||
|
carbs = _extract_num(r"(\d+(?:\.\d+)?)g\s*CARBS")
|
||||||
|
prot = _extract_num(r"(\d+(?:\.\d+)?)g\s*PROTEIN")
|
||||||
|
fiber = _extract_num(r"(\d+(?:\.\d+)?)g\s*FIBER")
|
||||||
|
|
||||||
|
# ── Allergens / tags ───────────────────────────────────────────────────────
|
||||||
|
allergen_m = re.search(r"Allergens?:\s*([^\n]+)", body)
|
||||||
|
allergens = allergen_m.group(1).strip() if allergen_m else ""
|
||||||
|
|
||||||
|
# Feature tags like HIGH-PROTEIN, QUICK, etc. appear before Ingredients
|
||||||
|
pre_ing = body[:ing_start]
|
||||||
|
tags = re.findall(r"\b(HIGH-PROTEIN|QUICK|SPICY|LOW[\-\s]CALORIE|VEGAN|FAMILY\s+FRIENDLY)\b", pre_ing)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"Slug": slug,
|
||||||
|
"Name": title,
|
||||||
|
"SourceURL": source_url,
|
||||||
|
"Source": "purplecarrot_live",
|
||||||
|
"RecipeIngredientParts": raw_ingredients,
|
||||||
|
"RecipeInstructions": raw_instructions,
|
||||||
|
"Calories": cal,
|
||||||
|
"FatContent": fat,
|
||||||
|
"CarbohydrateContent": carbs,
|
||||||
|
"ProteinContent": prot,
|
||||||
|
"FiberContent": fiber,
|
||||||
|
"Allergens": allergens,
|
||||||
|
"Keywords": tags,
|
||||||
|
"HasFullRecipe": bool(raw_ingredients and raw_instructions),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--out", type=Path, default=DEFAULT_OUT)
|
||||||
|
parser.add_argument("--delay", type=float, default=2.5,
|
||||||
|
help="Seconds between requests (be polite)")
|
||||||
|
parser.add_argument("--limit", type=int, default=0,
|
||||||
|
help="Stop after N slugs (0 = all)")
|
||||||
|
parser.add_argument("--resume", action="store_true",
|
||||||
|
help="Skip slugs already present in --out")
|
||||||
|
parser.add_argument("--slugs-from", type=Path, default=None,
|
||||||
|
help="Read slug inventory from this parquet instead of the default Wayback one")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Load slug inventory — either from a custom parquet or the default Wayback run
|
||||||
|
slugs_parquet = args.slugs_from if args.slugs_from else EXISTING_PARQUET
|
||||||
|
df_existing = pd.read_parquet(slugs_parquet)
|
||||||
|
slugs = df_existing["Slug"].dropna().unique().tolist()
|
||||||
|
# source_urls may not be present in custom parcets — fall back to constructing from slug
|
||||||
|
if "SourceURL" in df_existing.columns:
|
||||||
|
source_urls = dict(zip(df_existing["Slug"], df_existing["SourceURL"]))
|
||||||
|
else:
|
||||||
|
source_urls = {s: BASE_URL.format(slug=s) for s in slugs}
|
||||||
|
|
||||||
|
# Resume support
|
||||||
|
done_slugs: set[str] = set()
|
||||||
|
if args.resume and args.out.exists():
|
||||||
|
df_done = pd.read_parquet(args.out)
|
||||||
|
done_slugs = set(df_done["Slug"].dropna().tolist())
|
||||||
|
print(f"Resuming — {len(done_slugs)} slugs already scraped")
|
||||||
|
|
||||||
|
if args.limit:
|
||||||
|
slugs = slugs[: args.limit]
|
||||||
|
|
||||||
|
results: list[dict[str, Any]] = []
|
||||||
|
skipped = 0
|
||||||
|
failed = 0
|
||||||
|
|
||||||
|
_UA = (
|
||||||
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||||
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||||||
|
)
|
||||||
|
|
||||||
|
with sync_playwright() as p:
|
||||||
|
browser = p.chromium.launch(headless=True)
|
||||||
|
|
||||||
|
for i, slug in enumerate(slugs):
|
||||||
|
if slug in done_slugs:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
url = BASE_URL.format(slug=slug)
|
||||||
|
print(f"[{i+1}/{len(slugs)}] {slug} … ", end="", flush=True)
|
||||||
|
|
||||||
|
# Use a fresh browser context per slug to avoid Cloudflare session-level
|
||||||
|
# bot detection, which fires on the 2nd+ request in the same context.
|
||||||
|
context = browser.new_context(
|
||||||
|
user_agent=_UA,
|
||||||
|
viewport={"width": 1280, "height": 900},
|
||||||
|
)
|
||||||
|
page = context.new_page()
|
||||||
|
|
||||||
|
try:
|
||||||
|
page.goto(url, timeout=NAV_TIMEOUT_MS, wait_until="domcontentloaded")
|
||||||
|
page.wait_for_timeout(RENDER_WAIT_MS)
|
||||||
|
recipe = _parse_recipe(page, slug, source_urls.get(slug, url))
|
||||||
|
except PWTimeout:
|
||||||
|
print("TIMEOUT")
|
||||||
|
failed += 1
|
||||||
|
except Exception as exc:
|
||||||
|
print(f"ERROR: {exc}")
|
||||||
|
failed += 1
|
||||||
|
else:
|
||||||
|
if recipe is None:
|
||||||
|
print("no content (404 or redirect)")
|
||||||
|
failed += 1
|
||||||
|
elif recipe["HasFullRecipe"]:
|
||||||
|
n = len(recipe["RecipeIngredientParts"])
|
||||||
|
s = len(recipe["RecipeInstructions"])
|
||||||
|
print(f"OK ({n} ingredients, {s} steps)")
|
||||||
|
results.append(recipe)
|
||||||
|
else:
|
||||||
|
print(f"partial (ings={len(recipe['RecipeIngredientParts'])}, steps={len(recipe['RecipeInstructions'])})")
|
||||||
|
results.append(recipe)
|
||||||
|
finally:
|
||||||
|
context.close()
|
||||||
|
|
||||||
|
time.sleep(args.delay)
|
||||||
|
|
||||||
|
browser.close()
|
||||||
|
|
||||||
|
print(f"\nDone — {len(results)} scraped, {skipped} skipped, {failed} failed")
|
||||||
|
|
||||||
|
if results:
|
||||||
|
df_out = pd.DataFrame(results)
|
||||||
|
# Merge with existing metadata (nutrition stubs, wayback fields) for slugs
|
||||||
|
# that didn't previously have full data
|
||||||
|
args.out.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
if args.resume and args.out.exists():
|
||||||
|
df_prev = pd.read_parquet(args.out)
|
||||||
|
df_out = pd.concat([df_prev, df_out], ignore_index=True)
|
||||||
|
df_out = df_out.drop_duplicates(subset=["Slug"], keep="last")
|
||||||
|
df_out.to_parquet(args.out, index=False)
|
||||||
|
full_count = df_out["HasFullRecipe"].sum() if "HasFullRecipe" in df_out.columns else "?"
|
||||||
|
print(f"Saved {len(df_out)} rows to {args.out} ({full_count} with full recipes)")
|
||||||
|
else:
|
||||||
|
print("No results — output not written")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -37,12 +37,12 @@ import requests
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
|
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
|
||||||
WB_BASE = "https://web.archive.org/web"
|
WB_BASE = "https://web.archive.org/web"
|
||||||
PC_HOST = "www.purplecarrot.com"
|
PC_HOST = "www.purplecarrot.com"
|
||||||
|
|
||||||
REPLAY_DELAY = 1.2
|
REPLAY_DELAY = 2.0
|
||||||
CDX_DELAY = 0.5
|
CDX_DELAY = 3.0 # archive.org CDX rate-limits aggressively; be polite
|
||||||
|
|
||||||
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
||||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||||
|
|
@ -54,29 +54,41 @@ _REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n',
|
||||||
|
|
||||||
# ── Wayback helpers ───────────────────────────────────────────────────────────
|
# ── Wayback helpers ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _cdx_get(params: dict) -> list:
|
||||||
|
"""CDX request with retry on 429/503 (archive.org rate-limits aggressively)."""
|
||||||
|
for attempt in range(4):
|
||||||
|
try:
|
||||||
|
resp = requests.get(CDX_BASE, params=params, timeout=25)
|
||||||
|
if resp.status_code in (429, 503):
|
||||||
|
wait = 15 * (2 ** attempt)
|
||||||
|
logger.debug("CDX %s — backing off %ds", resp.status_code, wait)
|
||||||
|
time.sleep(wait)
|
||||||
|
continue
|
||||||
|
resp.raise_for_status()
|
||||||
|
rows = resp.json()
|
||||||
|
return rows if rows else []
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("CDX attempt %d failed: %s", attempt + 1, exc)
|
||||||
|
time.sleep(5 * (attempt + 1))
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
def _cdx_timestamps(slug: str) -> list[str]:
|
def _cdx_timestamps(slug: str) -> list[str]:
|
||||||
"""Return all captured timestamps for a product slug, oldest first."""
|
"""Return captured timestamps for a product slug, oldest first (pre-2022 window)."""
|
||||||
url = f"{PC_HOST}/api/v1/products/{slug}"
|
rows = _cdx_get({
|
||||||
try:
|
"url": f"{PC_HOST}/api/v1/products/{slug}",
|
||||||
resp = requests.get(
|
"output": "json",
|
||||||
CDX_BASE,
|
"fl": "timestamp,statuscode",
|
||||||
params={
|
"filter": "statuscode:200",
|
||||||
"url": url,
|
"limit": "20",
|
||||||
"output": "json",
|
# Pre-HelloFresh-acquisition captures (2019-2021) are most likely
|
||||||
"fl": "timestamp,statuscode",
|
# to have full instructions — API stripped them post-acquisition.
|
||||||
"filter": "statuscode:200",
|
"from": "20190101",
|
||||||
"limit": "20",
|
"to": "20211231",
|
||||||
},
|
})
|
||||||
timeout=20,
|
if len(rows) < 2:
|
||||||
)
|
|
||||||
resp.raise_for_status()
|
|
||||||
rows = resp.json()
|
|
||||||
if len(rows) < 2:
|
|
||||||
return []
|
|
||||||
return [row[0] for row in rows[1:]] # timestamps only, oldest first
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug("CDX timestamps failed for %s: %s", slug, exc)
|
|
||||||
return []
|
return []
|
||||||
|
return [row[0] for row in rows[1:]] # timestamps only, oldest first
|
||||||
|
|
||||||
|
|
||||||
def _wayback_json(url: str, timestamp: str) -> Any | None:
|
def _wayback_json(url: str, timestamp: str) -> Any | None:
|
||||||
|
|
@ -172,6 +184,9 @@ def _extract_from_api(data: dict) -> dict | None:
|
||||||
description = sku.get("description") or ""
|
description = sku.get("description") or ""
|
||||||
|
|
||||||
images = sku.get("hero_images") or sku.get("image_versions") or []
|
images = sku.get("hero_images") or sku.get("image_versions") or []
|
||||||
|
# hero_images can be a list OR a dict keyed by size string — normalise to list
|
||||||
|
if isinstance(images, dict):
|
||||||
|
images = list(images.values())
|
||||||
image_url = ""
|
image_url = ""
|
||||||
if images and isinstance(images[0], dict):
|
if images and isinstance(images[0], dict):
|
||||||
image_url = images[0].get("image_url") or images[0].get("url") or ""
|
image_url = images[0].get("image_url") or images[0].get("url") or ""
|
||||||
|
|
@ -319,23 +334,14 @@ def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None:
|
||||||
|
|
||||||
# HTML fallback when API has no steps/ingredients
|
# HTML fallback when API has no steps/ingredients
|
||||||
if not recipe or not recipe.get("has_full_recipe"):
|
if not recipe or not recipe.get("has_full_recipe"):
|
||||||
html_cdx_url = f"{PC_HOST}/recipe/{slug}"
|
html_ts_rows = _cdx_get({
|
||||||
try:
|
"url": f"{PC_HOST}/recipe/{slug}",
|
||||||
html_resp = requests.get(
|
"output": "json",
|
||||||
CDX_BASE,
|
"fl": "timestamp,statuscode",
|
||||||
params={
|
"filter": "statuscode:200",
|
||||||
"url": html_cdx_url,
|
"limit": "10",
|
||||||
"output": "json",
|
})
|
||||||
"fl": "timestamp,statuscode",
|
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
|
||||||
"filter": "statuscode:200",
|
|
||||||
"limit": "5",
|
|
||||||
},
|
|
||||||
timeout=20,
|
|
||||||
)
|
|
||||||
html_ts_rows = html_resp.json() if html_resp.ok else []
|
|
||||||
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
|
|
||||||
except Exception:
|
|
||||||
html_timestamps = []
|
|
||||||
time.sleep(CDX_DELAY)
|
time.sleep(CDX_DELAY)
|
||||||
|
|
||||||
for ts in html_timestamps:
|
for ts in html_timestamps:
|
||||||
|
|
@ -522,6 +528,9 @@ def main() -> None:
|
||||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||||
|
attach_pipeline_log("scrape_recipes")
|
||||||
|
|
||||||
scrape(args.slugs, args.out, resume=args.resume)
|
scrape(args.slugs, args.out, resume=args.resume)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
41
scripts/pipeline/purple_carrot/weekly_harvest.sh
Executable file
41
scripts/pipeline/purple_carrot/weekly_harvest.sh
Executable file
|
|
@ -0,0 +1,41 @@
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
# Weekly Purple Carrot recipe harvest
|
||||||
|
# Runs every Sunday night via cron.
|
||||||
|
# Discovers this week's menu and scrapes full recipe data.
|
||||||
|
# Logs to /Library/Assets/kiwi/pipeline/logs/purple_carrot_harvest.log
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
REPO="/Library/Development/CircuitForge/kiwi"
|
||||||
|
MENU_OUT="/Library/Assets/kiwi/pipeline/recipes_purplecarrot_menu.parquet"
|
||||||
|
LIVE_OUT="/Library/Assets/kiwi/pipeline/recipes_purplecarrot_live.parquet"
|
||||||
|
LOG_DIR="/Library/Assets/kiwi/pipeline/logs"
|
||||||
|
LOG="$LOG_DIR/purple_carrot_harvest.log"
|
||||||
|
|
||||||
|
mkdir -p "$LOG_DIR"
|
||||||
|
|
||||||
|
echo "=== Purple Carrot harvest $(date -u '+%Y-%m-%d %H:%M UTC') ===" >> "$LOG"
|
||||||
|
|
||||||
|
cd "$REPO"
|
||||||
|
|
||||||
|
# Step 1: discover this week's menu slugs
|
||||||
|
echo "[1/2] Discovering current menu slugs..." | tee -a "$LOG"
|
||||||
|
conda run -n cf python3 scripts/pipeline/purple_carrot/discover_current_menu.py \
|
||||||
|
--out "$MENU_OUT" 2>&1 | tee -a "$LOG"
|
||||||
|
|
||||||
|
# Step 2: scrape full recipe data for new slugs only (--resume skips already-scraped)
|
||||||
|
echo "[2/2] Scraping live recipe pages..." | tee -a "$LOG"
|
||||||
|
conda run -n cf python3 scripts/pipeline/purple_carrot/scrape_live.py \
|
||||||
|
--slugs-from "$MENU_OUT" \
|
||||||
|
--out "$LIVE_OUT" \
|
||||||
|
--resume \
|
||||||
|
--delay 3.0 2>&1 | tee -a "$LOG"
|
||||||
|
|
||||||
|
# Step 3: ingest new recipes into the shared corpus DB
|
||||||
|
echo "[3/3] Ingesting into corpus DB..." | tee -a "$LOG"
|
||||||
|
conda run -n cf python3 scripts/pipeline/ingest_purplecarrot.py \
|
||||||
|
--parquet "$LIVE_OUT" \
|
||||||
|
--db /Library/Assets/kiwi/kiwi.db 2>&1 | tee -a "$LOG"
|
||||||
|
|
||||||
|
echo "=== Done $(date -u '+%Y-%m-%d %H:%M UTC') ===" >> "$LOG"
|
||||||
|
echo "" >> "$LOG"
|
||||||
Loading…
Reference in a new issue