feat(pipeline): Purple Carrot scraper hardening + shared pipeline logging
scrape_recipes.py: - Switch CDX to HTTPS (avoids HTTP 503 rate-limit bucket) - Restrict product API CDX to 2019–2021 window (pre-HelloFresh instruction stripping) - Replace inline CDX requests with _cdx_get() helper: retries on 429/503 with exponential backoff (15s, 30s, 60s, 120s) - Increase HTML fallback CDX limit from 5 to 10 timestamps - Bump CDX_DELAY 0.5s → 3.0s and REPLAY_DELAY 1.2s → 2.0s (polite scraping) - Fix KeyError: 0 on hero_images dict (normalise dict to list before indexing) discover_wayback.py: - Switch CDX to HTTPS scripts/pipeline/log_utils.py (new): - attach_pipeline_log(script_name): adds a JSON FileHandler to the root logger writing to /Library/Assets/logs/pipeline/<script>_<ts>.jsonl for Avocet Turnstone training data ingestion (kiwi#141 / avocet#67)
This commit is contained in:
parent
84636bcdaf
commit
56f942b3fd
3 changed files with 122 additions and 42 deletions
68
scripts/pipeline/log_utils.py
Normal file
68
scripts/pipeline/log_utils.py
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
"""
|
||||||
|
Pipeline logging utility.
|
||||||
|
|
||||||
|
Adds a structured JSON FileHandler to the root logger so every pipeline
|
||||||
|
script automatically writes machine-readable logs to the shared datastore
|
||||||
|
at /Library/Assets/logs/pipeline/. Avocet ingests these for Turnstone
|
||||||
|
logreading training (kiwi#141 / avocet#67).
|
||||||
|
|
||||||
|
Usage (add near the top of main() after logging.basicConfig):
|
||||||
|
|
||||||
|
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||||
|
attach_pipeline_log("scrape_recipes")
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
PIPELINE_LOG_DIR = Path(
|
||||||
|
os.environ.get("PIPELINE_LOG_DIR", "/Library/Assets/logs/pipeline")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _JsonFormatter(logging.Formatter):
|
||||||
|
def format(self, record: logging.LogRecord) -> str:
|
||||||
|
payload: dict = {
|
||||||
|
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
|
||||||
|
"level": record.levelname,
|
||||||
|
"logger": record.name,
|
||||||
|
"msg": record.getMessage(),
|
||||||
|
}
|
||||||
|
if record.exc_info:
|
||||||
|
payload["exc"] = self.formatException(record.exc_info)
|
||||||
|
# Any extra kwargs passed via logger.info("...", extra={...})
|
||||||
|
standard = {
|
||||||
|
"name", "msg", "args", "levelname", "levelno", "pathname",
|
||||||
|
"filename", "module", "exc_info", "exc_text", "stack_info",
|
||||||
|
"lineno", "funcName", "created", "msecs", "relativeCreated",
|
||||||
|
"thread", "threadName", "processName", "process", "message",
|
||||||
|
"taskName",
|
||||||
|
}
|
||||||
|
extra = {k: v for k, v in record.__dict__.items() if k not in standard}
|
||||||
|
if extra:
|
||||||
|
payload["extra"] = extra
|
||||||
|
return json.dumps(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def attach_pipeline_log(script_name: str) -> Path:
|
||||||
|
"""Attach a JSON file handler to the root logger for pipeline logging.
|
||||||
|
|
||||||
|
Returns the path of the log file created.
|
||||||
|
"""
|
||||||
|
PIPELINE_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%S")
|
||||||
|
log_path = PIPELINE_LOG_DIR / f"{script_name}_{ts}.jsonl"
|
||||||
|
|
||||||
|
handler = logging.FileHandler(log_path, encoding="utf-8")
|
||||||
|
handler.setLevel(logging.DEBUG)
|
||||||
|
handler.setFormatter(_JsonFormatter())
|
||||||
|
logging.getLogger().addHandler(handler)
|
||||||
|
|
||||||
|
logging.getLogger(__name__).info(
|
||||||
|
"Pipeline log: %s", log_path, extra={"script": script_name}
|
||||||
|
)
|
||||||
|
return log_path
|
||||||
|
|
@ -31,7 +31,7 @@ import requests
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
|
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
|
||||||
WB_BASE = "https://web.archive.org/web"
|
WB_BASE = "https://web.archive.org/web"
|
||||||
PC_HOST = "www.purplecarrot.com"
|
PC_HOST = "www.purplecarrot.com"
|
||||||
|
|
||||||
|
|
@ -291,6 +291,9 @@ def main() -> None:
|
||||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||||
|
attach_pipeline_log("discover_wayback")
|
||||||
|
|
||||||
discover(args.out)
|
discover(args.out)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -37,12 +37,12 @@ import requests
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
CDX_BASE = "http://web.archive.org/cdx/search/cdx"
|
CDX_BASE = "https://web.archive.org/cdx/search/cdx"
|
||||||
WB_BASE = "https://web.archive.org/web"
|
WB_BASE = "https://web.archive.org/web"
|
||||||
PC_HOST = "www.purplecarrot.com"
|
PC_HOST = "www.purplecarrot.com"
|
||||||
|
|
||||||
REPLAY_DELAY = 1.2
|
REPLAY_DELAY = 2.0
|
||||||
CDX_DELAY = 0.5
|
CDX_DELAY = 3.0 # archive.org CDX rate-limits aggressively; be polite
|
||||||
|
|
||||||
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
DEFAULT_SLUGS = Path("/Library/Assets/kiwi/pipeline/pc_slugs.jsonl")
|
||||||
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
DEFAULT_OUT = Path("/Library/Assets/kiwi/pipeline/recipes_purplecarrot.parquet")
|
||||||
|
|
@ -54,29 +54,41 @@ _REDUX_STATE_RE = re.compile(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});\s*\n',
|
||||||
|
|
||||||
# ── Wayback helpers ───────────────────────────────────────────────────────────
|
# ── Wayback helpers ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _cdx_timestamps(slug: str) -> list[str]:
|
def _cdx_get(params: dict) -> list:
|
||||||
"""Return all captured timestamps for a product slug, oldest first."""
|
"""CDX request with retry on 429/503 (archive.org rate-limits aggressively)."""
|
||||||
url = f"{PC_HOST}/api/v1/products/{slug}"
|
for attempt in range(4):
|
||||||
try:
|
try:
|
||||||
resp = requests.get(
|
resp = requests.get(CDX_BASE, params=params, timeout=25)
|
||||||
CDX_BASE,
|
if resp.status_code in (429, 503):
|
||||||
params={
|
wait = 15 * (2 ** attempt)
|
||||||
"url": url,
|
logger.debug("CDX %s — backing off %ds", resp.status_code, wait)
|
||||||
|
time.sleep(wait)
|
||||||
|
continue
|
||||||
|
resp.raise_for_status()
|
||||||
|
rows = resp.json()
|
||||||
|
return rows if rows else []
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("CDX attempt %d failed: %s", attempt + 1, exc)
|
||||||
|
time.sleep(5 * (attempt + 1))
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def _cdx_timestamps(slug: str) -> list[str]:
|
||||||
|
"""Return captured timestamps for a product slug, oldest first (pre-2022 window)."""
|
||||||
|
rows = _cdx_get({
|
||||||
|
"url": f"{PC_HOST}/api/v1/products/{slug}",
|
||||||
"output": "json",
|
"output": "json",
|
||||||
"fl": "timestamp,statuscode",
|
"fl": "timestamp,statuscode",
|
||||||
"filter": "statuscode:200",
|
"filter": "statuscode:200",
|
||||||
"limit": "20",
|
"limit": "20",
|
||||||
},
|
# Pre-HelloFresh-acquisition captures (2019-2021) are most likely
|
||||||
timeout=20,
|
# to have full instructions — API stripped them post-acquisition.
|
||||||
)
|
"from": "20190101",
|
||||||
resp.raise_for_status()
|
"to": "20211231",
|
||||||
rows = resp.json()
|
})
|
||||||
if len(rows) < 2:
|
if len(rows) < 2:
|
||||||
return []
|
return []
|
||||||
return [row[0] for row in rows[1:]] # timestamps only, oldest first
|
return [row[0] for row in rows[1:]] # timestamps only, oldest first
|
||||||
except Exception as exc:
|
|
||||||
logger.debug("CDX timestamps failed for %s: %s", slug, exc)
|
|
||||||
return []
|
|
||||||
|
|
||||||
|
|
||||||
def _wayback_json(url: str, timestamp: str) -> Any | None:
|
def _wayback_json(url: str, timestamp: str) -> Any | None:
|
||||||
|
|
@ -172,6 +184,9 @@ def _extract_from_api(data: dict) -> dict | None:
|
||||||
description = sku.get("description") or ""
|
description = sku.get("description") or ""
|
||||||
|
|
||||||
images = sku.get("hero_images") or sku.get("image_versions") or []
|
images = sku.get("hero_images") or sku.get("image_versions") or []
|
||||||
|
# hero_images can be a list OR a dict keyed by size string — normalise to list
|
||||||
|
if isinstance(images, dict):
|
||||||
|
images = list(images.values())
|
||||||
image_url = ""
|
image_url = ""
|
||||||
if images and isinstance(images[0], dict):
|
if images and isinstance(images[0], dict):
|
||||||
image_url = images[0].get("image_url") or images[0].get("url") or ""
|
image_url = images[0].get("image_url") or images[0].get("url") or ""
|
||||||
|
|
@ -319,23 +334,14 @@ def fetch_recipe(slug: str, manifest_meta: dict) -> dict | None:
|
||||||
|
|
||||||
# HTML fallback when API has no steps/ingredients
|
# HTML fallback when API has no steps/ingredients
|
||||||
if not recipe or not recipe.get("has_full_recipe"):
|
if not recipe or not recipe.get("has_full_recipe"):
|
||||||
html_cdx_url = f"{PC_HOST}/recipe/{slug}"
|
html_ts_rows = _cdx_get({
|
||||||
try:
|
"url": f"{PC_HOST}/recipe/{slug}",
|
||||||
html_resp = requests.get(
|
|
||||||
CDX_BASE,
|
|
||||||
params={
|
|
||||||
"url": html_cdx_url,
|
|
||||||
"output": "json",
|
"output": "json",
|
||||||
"fl": "timestamp,statuscode",
|
"fl": "timestamp,statuscode",
|
||||||
"filter": "statuscode:200",
|
"filter": "statuscode:200",
|
||||||
"limit": "5",
|
"limit": "10",
|
||||||
},
|
})
|
||||||
timeout=20,
|
|
||||||
)
|
|
||||||
html_ts_rows = html_resp.json() if html_resp.ok else []
|
|
||||||
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
|
html_timestamps = [row[0] for row in html_ts_rows[1:]] if len(html_ts_rows) > 1 else []
|
||||||
except Exception:
|
|
||||||
html_timestamps = []
|
|
||||||
time.sleep(CDX_DELAY)
|
time.sleep(CDX_DELAY)
|
||||||
|
|
||||||
for ts in html_timestamps:
|
for ts in html_timestamps:
|
||||||
|
|
@ -522,6 +528,9 @@ def main() -> None:
|
||||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from scripts.pipeline.log_utils import attach_pipeline_log
|
||||||
|
attach_pipeline_log("scrape_recipes")
|
||||||
|
|
||||||
scrape(args.slugs, args.out, resume=args.resume)
|
scrape(args.slugs, args.out, resume=args.resume)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue