diff --git a/scripts/pipeline/log_utils.py b/scripts/pipeline/log_utils.py new file mode 100644 index 0000000..7e8e40f --- /dev/null +++ b/scripts/pipeline/log_utils.py @@ -0,0 +1,68 @@ +""" +Pipeline logging utility. + +Adds a structured JSON FileHandler to the root logger so every pipeline +script automatically writes machine-readable logs to the shared datastore +at /Library/Assets/logs/pipeline/. Avocet ingests these for Turnstone +logreading training (kiwi#141 / avocet#67). + +Usage (add near the top of main() after logging.basicConfig): + + from scripts.pipeline.log_utils import attach_pipeline_log + attach_pipeline_log("scrape_recipes") +""" +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime, timezone +from pathlib import Path + +PIPELINE_LOG_DIR = Path( + os.environ.get("PIPELINE_LOG_DIR", "/Library/Assets/logs/pipeline") +) + + +class _JsonFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + payload: dict = { + "ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + } + if record.exc_info: + payload["exc"] = self.formatException(record.exc_info) + # Any extra kwargs passed via logger.info("...", extra={...}) + standard = { + "name", "msg", "args", "levelname", "levelno", "pathname", + "filename", "module", "exc_info", "exc_text", "stack_info", + "lineno", "funcName", "created", "msecs", "relativeCreated", + "thread", "threadName", "processName", "process", "message", + "taskName", + } + extra = {k: v for k, v in record.__dict__.items() if k not in standard} + if extra: + payload["extra"] = extra + return json.dumps(payload) + + +def attach_pipeline_log(script_name: str) -> Path: + """Attach a JSON file handler to the root logger for pipeline logging. + + Returns the path of the log file created. + """ + PIPELINE_LOG_DIR.mkdir(parents=True, exist_ok=True) + ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%S") + log_path = PIPELINE_LOG_DIR / f"{script_name}_{ts}.jsonl" + + handler = logging.FileHandler(log_path, encoding="utf-8") + handler.setLevel(logging.DEBUG) + handler.setFormatter(_JsonFormatter()) + logging.getLogger().addHandler(handler) + + logging.getLogger(__name__).info( + "Pipeline log: %s", log_path, extra={"script": script_name} + ) + return log_path diff --git a/scripts/pipeline/purple_carrot/discover_wayback.py b/scripts/pipeline/purple_carrot/discover_wayback.py index 1c32bf1..ae894d2 100644 --- a/scripts/pipeline/purple_carrot/discover_wayback.py +++ b/scripts/pipeline/purple_carrot/discover_wayback.py @@ -31,7 +31,7 @@ import requests logger = logging.getLogger(__name__) -CDX_BASE = "http://web.archive.org/cdx/search/cdx" +CDX_BASE = "https://web.archive.org/cdx/search/cdx" WB_BASE = "https://web.archive.org/web" PC_HOST = "www.purplecarrot.com" @@ -291,6 +291,9 @@ def main() -> None: format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) + from scripts.pipeline.log_utils import attach_pipeline_log + attach_pipeline_log("discover_wayback") + discover(args.out) diff --git a/scripts/pipeline/purple_carrot/scrape_recipes.py b/scripts/pipeline/purple_carrot/scrape_recipes.py index 72672d1..bb84453 100644 --- a/scripts/pipeline/purple_carrot/scrape_recipes.py +++ b/scripts/pipeline/purple_carrot/scrape_recipes.py @@ -37,12 +37,12 @@ import requests logger = logging.getLogger(__name__) -CDX_BASE = "http://web.archive.org/cdx/search/cdx" +CDX_BASE = "https://web.archive.org/cdx/search/cdx" WB_BASE = "https://web.archive.org/web" PC_HOST = "www.purplecarrot.com" -REPLAY_DELAY = 1.2 -CDX_DELAY = 0.5 +REPLAY_DELAY = 2.0 +CDX_DELAY = 3.0 # archive.org CDX rate-limits aggressively; be polite DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl") DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet") @@ -54,29 +54,41 @@ _REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n', # ── Wayback helpers ─────────────────────────────────────────────────────────── +def _cdx_get(params: dict) -> list: + """CDX request with retry on 429/503 (archive.org rate-limits aggressively).""" + for attempt in range(4): + try: + resp = requests.get(CDX_BASE, params=params, timeout=25) + if resp.status_code in (429, 503): + wait = 15 * (2 ** attempt) + logger.debug("CDX %s — backing off %ds", resp.status_code, wait) + time.sleep(wait) + continue + resp.raise_for_status() + rows = resp.json() + return rows if rows else [] + except Exception as exc: + logger.debug("CDX attempt %d failed: %s", attempt + 1, exc) + time.sleep(5 * (attempt + 1)) + return [] + + def _cdx_timestamps(slug: str) -> list[str]: - """Return all captured timestamps for a product slug, oldest first.""" - url = f"{PC_HOST}/api/v1/products/{slug}" - try: - resp = requests.get( - CDX_BASE, - params={ - "url": url, - "output": "json", - "fl": "timestamp,statuscode", - "filter": "statuscode:200", - "limit": "20", - }, - timeout=20, - ) - resp.raise_for_status() - rows = resp.json() - if len(rows) < 2: - return [] - return [row[0] for row in rows[1:]] # timestamps only, oldest first - except Exception as exc: - logger.debug("CDX timestamps failed for %s: %s", slug, exc) + """Return captured timestamps for a product slug, oldest first (pre-2022 window).""" + rows = _cdx_get({ + "url": f"{PC_HOST}/api/v1/products/{slug}", + "output": "json", + "fl": "timestamp,statuscode", + "filter": "statuscode:200", + "limit": "20", + # Pre-HelloFresh-acquisition captures (2019-2021) are most likely + # to have full instructions — API stripped them post-acquisition. + "from": "20190101", + "to": "20211231", + }) + if len(rows) < 2: return [] + return [row[0] for row in rows[1:]] # timestamps only, oldest first def _wayback_json(url: str, timestamp: str) -> Any | None: @@ -172,6 +184,9 @@ def _extract_from_api(data: dict) -> dict | None: description = sku.get("description") or "" images = sku.get("hero_images") or sku.get("image_versions") or [] + # hero_images can be a list OR a dict keyed by size string — normalise to list + if isinstance(images, dict): + images = list(images.values()) image_url = "" if images and isinstance(images[0], dict): image_url = images[0].get("image_url") or images[0].get("url") or "" @@ -319,23 +334,14 @@ def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None: # HTML fallback when API has no steps/ingredients if not recipe or not recipe.get("has_full_recipe"): - html_cdx_url = f"{PC_HOST}/recipe/{slug}" - try: - html_resp = requests.get( - CDX_BASE, - params={ - "url": html_cdx_url, - "output": "json", - "fl": "timestamp,statuscode", - "filter": "statuscode:200", - "limit": "5", - }, - timeout=20, - ) - html_ts_rows = html_resp.json() if html_resp.ok else [] - html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else [] - except Exception: - html_timestamps = [] + html_ts_rows = _cdx_get({ + "url": f"{PC_HOST}/recipe/{slug}", + "output": "json", + "fl": "timestamp,statuscode", + "filter": "statuscode:200", + "limit": "10", + }) + html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else [] time.sleep(CDX_DELAY) for ts in html_timestamps: @@ -522,6 +528,9 @@ def main() -> None: format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) + from scripts.pipeline.log_utils import attach_pipeline_log + attach_pipeline_log("scrape_recipes") + scrape(args.slugs, args.out, resume=args.resume)