feat(mercari): Phase 2 — MercariAdapter with Xvfb stability fixes

Implements full Mercari scraping support for the trust-scoring pipeline:

- `app/platforms/mercari/` — new MercariAdapter (scraper-based), scraper
  (parse_search_html / parse_listing_html), and __init__
- `app/platforms/__init__.py` — adds "mercari" to SUPPORTED_PLATFORMS
- `api/main.py` — platform routing: _make_adapter, OR-group guard, seller
  lookup, BTF/Trading API guards all parameterised by platform
- `web/src/views/SearchView.vue` — enables Mercari tab in platform picker

BrowserPool stability fixes (browser_pool.py):
- Add -ac flag to Xvfb (disables X11 auth requirement in Docker containers)
- Shift display counter from :100-:199 to :200-:399 (avoids ghost kernel
  socket conflicts with low-numbered displays)
- Add wait_for_selector / wait_for_timeout_ms params to fetch_html,
  _fetch_with_slot, _fetch_fresh
- Add time.sleep(0.3) in _fetch_fresh after Xvfb start (was missing)

Mercari scraper fix:
- Remove sortBy=SORT_SCORE from build_search_url — that param is deprecated
  on Mercari and causes an empty 85KB response instead of search results

Probe + debug scripts in scripts/:
- probe_mercari.py — standalone Cloudflare bypass test
- debug_fetch_fresh.py — pool simulation diagnostic

Trust signal coverage: feedback_count, feedback_ratio partial score
(account_age_days, category_history absent = score_is_partial=True).
get_completed_sales stubbed for Phase 3.
Tracks: snipe#53 (pool thread-safety fix, follow-up)
This commit is contained in:
pyr0ball 2026-05-03 18:39:25 -07:00
parent f48f8ef80f
commit 15996472b7
9 changed files with 677 additions and 100 deletions

View file

@ -664,22 +664,22 @@ def _try_trading_api_enrichment(
return enriched return enriched
def _make_adapter(shared_store: Store, force: str = "auto"): def _make_adapter(shared_store: Store, force: str = "auto", platform: str = "ebay"):
"""Return the appropriate adapter. """Return the appropriate adapter for the given platform.
force: "auto" | "api" | "scraper" force: "auto" | "api" | "scraper" (ignored for non-eBay platforms)
auto API if creds present, else scraper auto API if creds present, else scraper
api Browse API (raises if no creds) api Browse API (raises if no creds)
scraper Playwright scraper regardless of creds scraper Playwright scraper regardless of creds
Adapters receive shared_store because they only read/write sellers and Adapters receive shared_store because they only read/write sellers and
market_comps never listings. Listings are returned and saved by the caller. market_comps never listings. Listings are returned and saved by the caller.
# Platform registry — add new adapters here as platforms are implemented.
# _make_adapter() currently handles eBay only. Phase 2 will add:
# "mercari": MercariAdapter
# "poshmark": PoshmarkAdapter
""" """
if platform == "mercari":
from app.platforms.mercari import MercariAdapter
return MercariAdapter(shared_store)
# eBay
client_id, client_secret, env = _ebay_creds() client_id, client_secret, env = _ebay_creds()
has_creds = bool(client_id and client_secret) has_creds = bool(client_id and client_secret)
@ -696,8 +696,10 @@ def _make_adapter(shared_store: Store, force: str = "auto"):
return ScrapedEbayAdapter(shared_store) return ScrapedEbayAdapter(shared_store)
def _adapter_name(force: str = "auto") -> str: def _adapter_name(force: str = "auto", platform: str = "ebay") -> str:
"""Return the name of the adapter that would be used — without creating it.""" """Return the name of the adapter that would be used — without creating it."""
if platform != "ebay":
return platform
client_id, client_secret, _ = _ebay_creds() client_id, client_secret, _ = _ebay_creds()
if force == "scraper": if force == "scraper":
return "scraper" return "scraper"
@ -735,7 +737,7 @@ def search(
q = ebay_item_id q = ebay_item_id
if not q.strip(): if not q.strip():
return {"listings": [], "trust_scores": {}, "sellers": {}, "market_price": None, "adapter_used": _adapter_name(adapter)} return {"listings": [], "trust_scores": {}, "sellers": {}, "market_price": None, "adapter_used": _adapter_name(adapter, platform=platform)}
# Cap pages to the tier's maximum — free cloud users get 1 page, local gets unlimited. # Cap pages to the tier's maximum — free cloud users get 1 page, local gets unlimited.
features = compute_features(session.tier) features = compute_features(session.tier)
@ -743,9 +745,8 @@ def search(
must_exclude_terms = _parse_terms(must_exclude) must_exclude_terms = _parse_terms(must_exclude)
# In Groups mode, expand OR groups into multiple targeted eBay queries to # OR-group expansion is eBay-specific; other platforms use the base query directly.
# guarantee comprehensive result coverage — eBay relevance won't silently drop variants. if platform == "ebay" and must_include_mode == "groups" and must_include.strip():
if must_include_mode == "groups" and must_include.strip():
or_groups = parse_groups(must_include) or_groups = parse_groups(must_include)
ebay_queries = expand_queries(q, or_groups) ebay_queries = expand_queries(q, or_groups)
else: else:
@ -772,7 +773,7 @@ def search(
category_id=category_id.strip() or None, category_id=category_id.strip() or None,
) )
adapter_used = _adapter_name(adapter) adapter_used = _adapter_name(adapter, platform=platform)
shared_db = session.shared_db shared_db = session.shared_db
user_db = session.user_db user_db = session.user_db
@ -832,11 +833,11 @@ def search(
} }
seller_map = { seller_map = {
listing.seller_platform_id: dataclasses.asdict( listing.seller_platform_id: dataclasses.asdict(
shared_store.get_seller("ebay", listing.seller_platform_id) shared_store.get_seller(platform, listing.seller_platform_id)
) )
for listing in listings for listing in listings
if listing.seller_platform_id if listing.seller_platform_id
and shared_store.get_seller("ebay", listing.seller_platform_id) and shared_store.get_seller(platform, listing.seller_platform_id)
} }
_is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:") _is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:")
@ -890,11 +891,11 @@ def search(
# Each thread creates its own Store — sqlite3 check_same_thread=True. # Each thread creates its own Store — sqlite3 check_same_thread=True.
def _run_search(ebay_query: str) -> list: def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(shared_db), adapter).search(ebay_query, base_filters) return _make_adapter(Store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None: def _run_comps() -> None:
try: try:
_make_adapter(Store(shared_db), adapter).get_completed_sales(comp_query, pages) _make_adapter(Store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception: except Exception:
log.warning("comps: unhandled exception for %r", comp_query, exc_info=True) log.warning("comps: unhandled exception for %r", comp_query, exc_info=True)
@ -943,25 +944,23 @@ def search(
user_store.save_listings(listings) user_store.save_listings(listings)
# Derive category_history from accumulated listing data — free for API adapter # Derive category_history from accumulated listing data — eBay only
# (category_name comes from Browse API response), no-op for scraper listings (category_name=None). # (category_name comes from Browse API response; other platforms return None).
# Reads listings from user_store, writes seller categories to shared_store.
seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id}) seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id})
if platform == "ebay":
n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store) n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store)
if n_cat: if n_cat:
log.info("Category history derived for %d sellers from listing data", n_cat) log.info("Category history derived for %d sellers from listing data", n_cat)
# Re-fetch to hydrate staging fields (times_seen, first_seen_at, id, price_at_first_seen) # Re-fetch to hydrate staging fields (times_seen, first_seen_at, id, price_at_first_seen)
# that are only available from the DB after the upsert. # that are only available from the DB after the upsert.
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings]) staged = user_store.get_listings_staged(platform, [l.platform_listing_id for l in listings])
listings = [staged.get(l.platform_listing_id, l) for l in listings] listings = [staged.get(l.platform_listing_id, l) for l in listings]
# Trading API enrichment: if the user has connected their eBay account, use # Trading API enrichment and BTF scraping are eBay-specific.
# Trading API GetUser to instantly fill account_age_days for sellers missing it. _main_adapter = _make_adapter(shared_store, adapter, platform=platform)
# This is synchronous (~200ms per seller) but only runs for sellers that need trading_api_enriched: set[str] = set()
# enrichment — typically a small subset. Sellers resolved here are excluded from if platform == "ebay":
# the slower BTF Playwright background pass.
_main_adapter = _make_adapter(shared_store, adapter)
sellers_needing_age = [ sellers_needing_age = [
l.seller_platform_id for l in listings l.seller_platform_id for l in listings
if l.seller_platform_id if l.seller_platform_id
@ -975,9 +974,7 @@ def search(
_main_adapter, sellers_needing_age, user_db _main_adapter, sellers_needing_age, user_db
) )
# BTF enrichment: scrape /itm/ pages for sellers still missing account_age_days # BTF enrichment: scrape /itm/ pages for sellers still missing account_age_days.
# after the Trading API pass. Runs in the background so it doesn't delay the
# response. Live score updates are pushed to the pre-registered SSE queue.
_trigger_scraper_enrichment( _trigger_scraper_enrichment(
listings, shared_store, shared_db, listings, shared_store, shared_db,
user_db=user_db, query=comp_query, session_id=session_id, user_db=user_db, query=comp_query, session_id=session_id,
@ -996,7 +993,7 @@ def search(
_enqueue_vision_tasks(listings, trust_scores_list, session) _enqueue_vision_tasks(listings, trust_scores_list, session)
query_hash = hashlib.md5(comp_query.encode()).hexdigest() query_hash = hashlib.md5(comp_query.encode()).hexdigest()
comp = shared_store.get_market_comp("ebay", query_hash) comp = shared_store.get_market_comp(platform, query_hash)
market_price = comp.median_price if comp else None market_price = comp.median_price if comp else None
# Store raw listings (as dicts) + market_price in cache. # Store raw listings (as dicts) + market_price in cache.
@ -1015,11 +1012,11 @@ def search(
} }
seller_map = { seller_map = {
listing.seller_platform_id: dataclasses.asdict( listing.seller_platform_id: dataclasses.asdict(
shared_store.get_seller("ebay", listing.seller_platform_id) shared_store.get_seller(platform, listing.seller_platform_id)
) )
for listing in listings for listing in listings
if listing.seller_platform_id if listing.seller_platform_id
and shared_store.get_seller("ebay", listing.seller_platform_id) and shared_store.get_seller(platform, listing.seller_platform_id)
} }
# Build a preference reader for affiliate URL wrapping. # Build a preference reader for affiliate URL wrapping.
@ -1123,7 +1120,7 @@ def search_async(
"trust_scores": {}, "trust_scores": {},
"sellers": {}, "sellers": {},
"market_price": None, "market_price": None,
"adapter_used": _adapter_name(adapter), "adapter_used": _adapter_name(adapter, platform=platform),
"affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()), "affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()),
}) })
_update_queues[empty_id].put(None) _update_queues[empty_id].put(None)
@ -1152,7 +1149,8 @@ def search_async(
q_norm = q # captured from outer scope q_norm = q # captured from outer scope
must_exclude_terms = _parse_terms(must_exclude) must_exclude_terms = _parse_terms(must_exclude)
if must_include_mode == "groups" and must_include.strip(): # OR-group expansion is eBay-specific; other platforms use the base query directly.
if platform == "ebay" and must_include_mode == "groups" and must_include.strip():
or_groups = parse_groups(must_include) or_groups = parse_groups(must_include)
ebay_queries = expand_queries(q_norm, or_groups) ebay_queries = expand_queries(q_norm, or_groups)
else: else:
@ -1174,7 +1172,7 @@ def search_async(
category_id=category_id.strip() or None, category_id=category_id.strip() or None,
) )
adapter_used = _adapter_name(adapter) adapter_used = _adapter_name(adapter, platform=platform)
q_ref = _update_queues.get(session_id) q_ref = _update_queues.get(session_id)
if q_ref is None: if q_ref is None:
return # client disconnected before we even started return # client disconnected before we even started
@ -1281,11 +1279,11 @@ def search_async(
try: try:
def _run_search(ebay_query: str) -> list: def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters) return _make_adapter(Store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None: def _run_comps() -> None:
try: try:
_make_adapter(Store(_shared_db), adapter).get_completed_sales(comp_query, pages) _make_adapter(Store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception: except Exception:
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True) log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
@ -1314,14 +1312,17 @@ def search_async(
user_store.save_listings(listings) user_store.save_listings(listings)
seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id}) seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id})
if platform == "ebay":
n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store) n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store)
if n_cat: if n_cat:
log.info("async_search: category history derived for %d sellers", n_cat) log.info("async_search: category history derived for %d sellers", n_cat)
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings]) staged = user_store.get_listings_staged(platform, [l.platform_listing_id for l in listings])
listings = [staged.get(l.platform_listing_id, l) for l in listings] listings = [staged.get(l.platform_listing_id, l) for l in listings]
_main_adapter = _make_adapter(shared_store, adapter) _main_adapter = _make_adapter(shared_store, adapter, platform=platform)
sellers_needing_age: list[str] = []
if platform == "ebay":
sellers_needing_age = [ sellers_needing_age = [
l.seller_platform_id for l in listings l.seller_platform_id for l in listings
if l.seller_platform_id if l.seller_platform_id
@ -1331,7 +1332,7 @@ def search_async(
seen_set: set[str] = set() seen_set: set[str] = set()
sellers_needing_age = [s for s in sellers_needing_age if not (s in seen_set or seen_set.add(s))] # type: ignore[func-returns-value] sellers_needing_age = [s for s in sellers_needing_age if not (s in seen_set or seen_set.add(s))] # type: ignore[func-returns-value]
# Use a temporary CloudUser-like object for Trading API enrichment # Use a temporary CloudUser-like object for Trading API enrichment (eBay only)
from api.cloud_session import CloudUser as _CloudUser from api.cloud_session import CloudUser as _CloudUser
_session_stub = _CloudUser( _session_stub = _CloudUser(
user_id=_user_id, user_id=_user_id,
@ -1339,6 +1340,8 @@ def search_async(
shared_db=_shared_db, shared_db=_shared_db,
user_db=_user_db, user_db=_user_db,
) )
trading_api_enriched: set[str] = set()
if platform == "ebay":
trading_api_enriched = _try_trading_api_enrichment( trading_api_enriched = _try_trading_api_enrichment(
_main_adapter, sellers_needing_age, _user_db _main_adapter, sellers_needing_age, _user_db
) )
@ -1353,7 +1356,7 @@ def search_async(
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub) _enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
query_hash = _hashlib_local.md5(comp_query.encode()).hexdigest() query_hash = _hashlib_local.md5(comp_query.encode()).hexdigest()
comp = shared_store.get_market_comp("ebay", query_hash) comp = shared_store.get_market_comp(platform, query_hash)
market_price = comp.median_price if comp else None market_price = comp.median_price if comp else None
# Store raw listings + market_price in cache (trust scores excluded). # Store raw listings + market_price in cache (trust scores excluded).
@ -1369,11 +1372,11 @@ def search_async(
} }
seller_map = { seller_map = {
listing.seller_platform_id: dataclasses.asdict( listing.seller_platform_id: dataclasses.asdict(
shared_store.get_seller("ebay", listing.seller_platform_id) shared_store.get_seller(platform, listing.seller_platform_id)
) )
for listing in listings for listing in listings
if listing.seller_platform_id if listing.seller_platform_id
and shared_store.get_seller("ebay", listing.seller_platform_id) and shared_store.get_seller(platform, listing.seller_platform_id)
} }
_is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:") _is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:")
@ -1404,12 +1407,17 @@ def search_async(
"session_id": session_id, "session_id": session_id,
}) })
# Kick off background enrichment — it pushes "update" events and the sentinel. # BTF background enrichment is eBay-specific.
if platform == "ebay":
_trigger_scraper_enrichment( _trigger_scraper_enrichment(
listings, shared_store, _shared_db, listings, shared_store, _shared_db,
user_db=_user_db, query=comp_query, session_id=session_id, user_db=_user_db, query=comp_query, session_id=session_id,
skip_seller_ids=trading_api_enriched, skip_seller_ids=trading_api_enriched,
) )
else:
# For non-eBay platforms, push the sentinel directly since there's no
# background enrichment pass.
_push(None)
except _sqlite3.OperationalError as e: except _sqlite3.OperationalError as e:
log.warning("async_search DB contention: %s", e) log.warning("async_search DB contention: %s", e)

View file

@ -9,7 +9,7 @@ from app.db.models import Listing, Seller
# Single source of truth for platform validation. # Single source of truth for platform validation.
# Phase 2 will extend this set as new adapters are implemented. # Phase 2 will extend this set as new adapters are implemented.
SUPPORTED_PLATFORMS: frozenset[str] = frozenset({"ebay"}) SUPPORTED_PLATFORMS: frozenset[str] = frozenset({"ebay", "mercari"})
@dataclass @dataclass

View file

@ -6,6 +6,7 @@ long-lived Playwright browser instances with fresh contexts ready to serve.
Key design: Key design:
- Pool slots: ``(xvfb_proc, pw_instance, browser, context, display_num, last_used_ts)`` - Pool slots: ``(xvfb_proc, pw_instance, browser, context, display_num, last_used_ts)``
One headed Chromium browser per slot keeps the Kasada fingerprint clean. One headed Chromium browser per slot keeps the Kasada fingerprint clean.
- Display numbering: :200-:399 (avoids host :0 and low-numbered kernel socket conflicts).
- Thread safety: ``queue.Queue`` with blocking get (timeout=3s before fresh fallback). - Thread safety: ``queue.Queue`` with blocking get (timeout=3s before fresh fallback).
- Replenishment: after each use, the dirty context is closed and a new context is - Replenishment: after each use, the dirty context is closed and a new context is
opened on the *same* browser, then returned to the queue. Browser launch overhead opened on the *same* browser, then returned to the queue. Browser launch overhead
@ -33,15 +34,17 @@ from typing import Optional
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# Reuse the same display counter namespace as scraper.py to avoid collisions. # Display counter shared by pool warmup and _fetch_fresh fallback.
# Pool uses :100-:199; scraper.py fallback uses :200-:299. # Range :200-:399 avoids low-numbered displays that may be pre-occupied by
_pool_display_counter = itertools.cycle(range(100, 200)) # the host X server or lingering kernel sockets from previous runs.
_pool_display_counter = itertools.cycle(range(200, 400))
_IDLE_TIMEOUT_SECS = 300 # 5 minutes _IDLE_TIMEOUT_SECS = 300 # 5 minutes
_CLEANUP_INTERVAL_SECS = 60 _CLEANUP_INTERVAL_SECS = 60
_QUEUE_TIMEOUT_SECS = 3.0 _QUEUE_TIMEOUT_SECS = 3.0
_CHROMIUM_ARGS = ["--no-sandbox", "--disable-dev-shm-usage"] _CHROMIUM_ARGS = ["--no-sandbox", "--disable-dev-shm-usage"]
_XVFB_ARGS = ["-screen", "0", "1280x800x24", "-ac"] # -ac: disable X auth (safe in isolated Docker)
_USER_AGENT = ( _USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
@ -74,7 +77,7 @@ def _launch_slot() -> "_PooledBrowser":
env["DISPLAY"] = display env["DISPLAY"] = display
xvfb = subprocess.Popen( xvfb = subprocess.Popen(
["Xvfb", display, "-screen", "0", "1280x800x24"], ["Xvfb", display] + _XVFB_ARGS,
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
) )
@ -230,7 +233,13 @@ class BrowserPool:
# Core fetch # Core fetch
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def fetch_html(self, url: str, delay: float = 1.0) -> str: def fetch_html(
self,
url: str,
delay: float = 1.0,
wait_for_selector: Optional[str] = None,
wait_for_timeout_ms: int = 2000,
) -> str:
"""Navigate to *url* and return the rendered HTML. """Navigate to *url* and return the rendered HTML.
Borrows a browser context from the pool (blocks up to 3s), uses it to Borrows a browser context from the pool (blocks up to 3s), uses it to
@ -238,6 +247,15 @@ class BrowserPool:
Falls back to a fully fresh browser if the pool is empty after the Falls back to a fully fresh browser if the pool is empty after the
timeout or if Playwright is unavailable. timeout or if Playwright is unavailable.
Args:
wait_for_selector: CSS/data-testid selector to wait for before capturing
HTML (e.g. ``"[data-testid='SearchResults']"``). When set, the fixed
*wait_for_timeout_ms* sleep is skipped the page is captured as soon
as the selector appears (or after 15s timeout, whichever comes first).
wait_for_timeout_ms: static post-navigation sleep in ms when
*wait_for_selector* is None. Default 2000; set higher (e.g. 8000)
for sites with JS challenge pages (Cloudflare Turnstile).
""" """
time.sleep(delay) time.sleep(delay)
@ -249,7 +267,11 @@ class BrowserPool:
if slot is not None: if slot is not None:
try: try:
html = self._fetch_with_slot(slot, url) html = self._fetch_with_slot(
slot, url,
wait_for_selector=wait_for_selector,
wait_for_timeout_ms=wait_for_timeout_ms,
)
# Replenish: close dirty context, open fresh one, return to queue. # Replenish: close dirty context, open fresh one, return to queue.
try: try:
fresh_slot = _replenish_slot(slot) fresh_slot = _replenish_slot(slot)
@ -264,7 +286,11 @@ class BrowserPool:
# Fall through to fresh browser below. # Fall through to fresh browser below.
# Fallback: fresh browser (same code as old scraper._fetch_url). # Fallback: fresh browser (same code as old scraper._fetch_url).
return self._fetch_fresh(url) return self._fetch_fresh(
url,
wait_for_selector=wait_for_selector,
wait_for_timeout_ms=wait_for_timeout_ms,
)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal helpers # Internal helpers
@ -282,7 +308,13 @@ class BrowserPool:
self._playwright_available = False self._playwright_available = False
return self._playwright_available return self._playwright_available
def _fetch_with_slot(self, slot: _PooledBrowser, url: str) -> str: def _fetch_with_slot(
self,
slot: _PooledBrowser,
url: str,
wait_for_selector: Optional[str] = None,
wait_for_timeout_ms: int = 2000,
) -> str:
"""Open a new page on *slot.ctx*, navigate to *url*, return HTML.""" """Open a new page on *slot.ctx*, navigate to *url*, return HTML."""
from playwright_stealth import Stealth from playwright_stealth import Stealth
@ -290,7 +322,13 @@ class BrowserPool:
try: try:
Stealth().apply_stealth_sync(page) Stealth().apply_stealth_sync(page)
page.goto(url, wait_until="domcontentloaded", timeout=30_000) page.goto(url, wait_until="domcontentloaded", timeout=30_000)
page.wait_for_timeout(2000) if wait_for_selector:
try:
page.wait_for_selector(wait_for_selector, timeout=15_000)
except Exception:
pass # selector didn't appear; return whatever loaded
else:
page.wait_for_timeout(wait_for_timeout_ms)
return page.content() return page.content()
finally: finally:
try: try:
@ -298,7 +336,12 @@ class BrowserPool:
except Exception: except Exception:
pass pass
def _fetch_fresh(self, url: str) -> str: def _fetch_fresh(
self,
url: str,
wait_for_selector: Optional[str] = None,
wait_for_timeout_ms: int = 2000,
) -> str:
"""Launch a fully fresh browser, fetch *url*, close everything.""" """Launch a fully fresh browser, fetch *url*, close everything."""
import subprocess as _subprocess import subprocess as _subprocess
@ -307,7 +350,7 @@ class BrowserPool:
from playwright_stealth import Stealth from playwright_stealth import Stealth
except ImportError as exc: except ImportError as exc:
raise RuntimeError( raise RuntimeError(
"Playwright not installed — cannot fetch eBay pages. " "Playwright not installed — cannot fetch pages. "
"Install playwright and playwright-stealth in the Docker image." "Install playwright and playwright-stealth in the Docker image."
) from exc ) from exc
@ -317,10 +360,11 @@ class BrowserPool:
env["DISPLAY"] = display env["DISPLAY"] = display
xvfb = _subprocess.Popen( xvfb = _subprocess.Popen(
["Xvfb", display, "-screen", "0", "1280x800x24"], ["Xvfb", display] + _XVFB_ARGS,
stdout=_subprocess.DEVNULL, stdout=_subprocess.DEVNULL,
stderr=_subprocess.DEVNULL, stderr=_subprocess.DEVNULL,
) )
time.sleep(0.3) # wait for Xvfb to bind the display socket before Chromium starts
try: try:
with sync_playwright() as pw: with sync_playwright() as pw:
browser = pw.chromium.launch( browser = pw.chromium.launch(
@ -335,7 +379,13 @@ class BrowserPool:
page = ctx.new_page() page = ctx.new_page()
Stealth().apply_stealth_sync(page) Stealth().apply_stealth_sync(page)
page.goto(url, wait_until="domcontentloaded", timeout=30_000) page.goto(url, wait_until="domcontentloaded", timeout=30_000)
page.wait_for_timeout(2000) if wait_for_selector:
try:
page.wait_for_selector(wait_for_selector, timeout=15_000)
except Exception:
pass # selector didn't appear; return whatever loaded
else:
page.wait_for_timeout(wait_for_timeout_ms)
html = page.content() html = page.content()
browser.close() browser.close()
finally: finally:

View file

@ -0,0 +1,4 @@
"""Mercari platform adapter."""
from app.platforms.mercari.adapter import MercariAdapter
__all__ = ["MercariAdapter"]

View file

@ -0,0 +1,173 @@
"""MercariAdapter — scraper-based Mercari platform adapter.
Trust signal coverage vs eBay:
feedback_count (NumSales from listing page)
feedback_ratio (ReviewStarsWrapper data-stars / 5)
account_age_days (requires seller profile page future work)
category_history (not exposed in HTML future work)
price_vs_market (computed by trust scorer from comps, same as eBay)
Because account_age and category_history are always None, TrustScore.score_is_partial
will be True for all Mercari results. The aggregator handles this correctly
by scoring only from available signals.
seller_platform_id on Listing objects holds the product_id (e.g. "m86032668393")
rather than the seller username, because search results don't expose seller identity.
get_seller() resolves the product_id seller by fetching the listing page.
The DB lookup key is (platform="mercari", platform_seller_id=product_id).
"""
from __future__ import annotations
import json
import logging
import time
from typing import Optional
from app.db.models import Listing, MarketComp, Seller
from app.db.store import Store
from app.platforms import PlatformAdapter, SearchFilters
from app.platforms.mercari.scraper import (
build_search_url,
parse_listing_html,
parse_search_html,
)
log = logging.getLogger(__name__)
_SELLER_CACHE_TTL_HOURS = 6
_BETWEEN_LISTING_FETCH_SECS = 1.5
class MercariAdapter(PlatformAdapter):
def __init__(self, store: Store) -> None:
self._store = store
def search(self, query: str, filters: SearchFilters) -> list[Listing]:
from app.platforms.ebay.browser_pool import get_pool
url = build_search_url(query, filters.max_price, filters.min_price)
log.info("mercari: fetching search URL: %s", url)
html = get_pool().fetch_html(
url,
delay=1.0,
wait_for_timeout_ms=8000,
)
raw_listings = parse_search_html(html)
listings: list[Listing] = []
seen: set[str] = set()
for raw in raw_listings:
pid = raw["product_id"]
if pid in seen:
continue
seen.add(pid)
listings.append(_normalise_listing(raw, query))
log.info("mercari: parsed %d listings for %r", len(listings), query)
# Client-side keyword filter (mirrors eBay scraper behaviour).
if filters.must_include:
listings = _apply_keyword_filter(listings, filters.must_include, filters.must_include_mode)
if filters.must_exclude:
listings = _apply_exclude_filter(listings, filters.must_exclude)
return listings
def get_seller(self, seller_platform_id: str) -> Optional[Seller]:
"""Fetch seller data from the listing page identified by seller_platform_id.
For Mercari, seller_platform_id is the product_id (e.g. "m86032668393")
because seller usernames aren't available from search results HTML.
"""
cached = self._store.get_seller("mercari", seller_platform_id)
if cached:
return cached
from app.platforms.ebay.browser_pool import get_pool
url = f"https://www.mercari.com/us/item/{seller_platform_id}/"
try:
time.sleep(_BETWEEN_LISTING_FETCH_SECS)
html = get_pool().fetch_html(
url,
delay=0.5,
wait_for_timeout_ms=6000,
)
raw = parse_listing_html(html, seller_platform_id)
seller = _normalise_seller(raw)
self._store.save_seller(seller)
return seller
except Exception as exc:
log.warning("mercari: get_seller failed for %s: %s", seller_platform_id, exc)
return None
def get_completed_sales(self, query: str, pages: int = 1) -> list[Listing]:
"""Mercari sold-listing comps — stubbed for Phase 3.
Mercari exposes sold listings via ?status=ITEM_STATUS_TRADING but the
data is sparse. Phase 3 will implement comp extraction here; for now
the trust scorer falls back to price_vs_market=None (partial score).
"""
return []
# ---------------------------------------------------------------------------
# Normalisation helpers
# ---------------------------------------------------------------------------
def _normalise_listing(raw: dict, query: str) -> Listing:
return Listing(
platform="mercari",
platform_listing_id=raw["product_id"],
title=raw["title"],
price=raw["price"],
currency="USD",
condition="", # not available from search results; get_seller() populates this
seller_platform_id=raw["product_id"], # see module docstring
url=raw["url"],
photo_urls=[raw["photo_url"]] if raw.get("photo_url") else [],
listing_age_days=0,
buying_format="fixed_price",
category_name=None,
)
def _normalise_seller(raw: dict) -> Seller:
stars = raw.get("stars", 0.0)
feedback_ratio = min(stars / 5.0, 1.0) if stars > 0 else 0.0
return Seller(
platform="mercari",
platform_seller_id=raw["product_id"],
username=raw.get("username", ""),
account_age_days=None, # not available without seller profile page
feedback_count=raw.get("num_sales", 0),
feedback_ratio=feedback_ratio,
category_history_json=json.dumps({}),
)
def _apply_keyword_filter(listings: list[Listing], must_include: list[str], mode: str) -> list[Listing]:
if not must_include:
return listings
def _matches(listing: Listing) -> bool:
title = listing.title.lower()
if mode == "any":
return any(kw.lower() in title for kw in must_include)
# "all" (default) and "groups" both require all terms present
return all(kw.lower() in title for kw in must_include)
return [l for l in listings if _matches(l)]
def _apply_exclude_filter(listings: list[Listing], must_exclude: list[str]) -> list[Listing]:
if not must_exclude:
return listings
def _clean(listing: Listing) -> bool:
title = listing.title.lower()
return not any(term.lower() in title for term in must_exclude)
return [l for l in listings if _clean(l)]

View file

@ -0,0 +1,165 @@
"""Mercari search + listing page scraper.
Uses the shared eBay browser pool (headed Chromium + Xvfb + playwright-stealth)
which already bypasses Cloudflare Turnstile. Import the pool singleton from
ebay.browser_pool so both platforms share the same warm Chromium instances.
Seller data is NOT available from search results HTML only from individual
listing pages. The adapter lazily fetches listing pages in get_seller().
"""
from __future__ import annotations
import logging
import re
from typing import Optional
from urllib.parse import urlencode
from bs4 import BeautifulSoup, NavigableString
log = logging.getLogger(__name__)
_BASE = "https://www.mercari.com"
_SEARCH_PATH = "/search/"
_ITEM_PATH = "/us/item/"
_PRICE_RE = re.compile(r"[\d,]+\.?\d*")
_POSTED_RE = re.compile(r"(\d{2})/(\d{2})/(\d{2,4})") # MM/DD/YY or MM/DD/YYYY
def build_search_url(query: str, max_price: Optional[float] = None, min_price: Optional[float] = None) -> str:
# No explicit sortBy — Mercari's default (relevance) is the most useful order.
# "sortBy=SORT_SCORE" was a deprecated value that returns an empty results page.
params: dict = {"keyword": query}
# Mercari accepts priceMin/priceMax as whole dollar strings (not cents)
if min_price is not None and min_price > 0:
params["priceMin"] = str(int(min_price))
if max_price is not None and max_price > 0:
params["priceMax"] = str(int(max_price))
return f"{_BASE}{_SEARCH_PATH}?{urlencode(params)}"
def parse_search_html(html: str) -> list[dict]:
"""Parse Mercari search results HTML into a list of raw listing dicts."""
soup = BeautifulSoup(html, "html.parser")
results: list[dict] = []
for item in soup.find_all(attrs={"data-testid": "ItemContainer"}):
pid = item.get("data-productid", "")
if not pid:
continue
parent = item.parent
href = parent.get("href") if parent and parent.name == "a" else None
url = f"{_BASE}{href}" if href else f"{_BASE}{_ITEM_PATH}{pid}/"
name_el = item.find(attrs={"data-testid": "ItemName"})
title = name_el.get_text(strip=True) if name_el else ""
price = _extract_current_price(item)
img_el = item.find("img")
photo_url = img_el.get("src", "") if img_el else ""
results.append({
"product_id": pid,
"url": url,
"title": title,
"price": price,
"photo_url": photo_url,
"brand": item.get("data-brand", ""),
"is_on_sale": item.get("data-is-on-sale") == "true",
})
return results
def _extract_current_price(item: BeautifulSoup) -> float:
"""Return the current (non-strikethrough) price from an ItemContainer."""
price_el = item.find(attrs={"data-testid": "ProductThumbItemPrice"})
if not price_el:
return 0.0
# Direct text nodes are the current price; the nested span is the original.
price_text = "".join(
str(c) for c in price_el.children if isinstance(c, NavigableString)
).strip()
m = _PRICE_RE.search(price_text)
if m:
try:
return float(m.group().replace(",", ""))
except ValueError:
pass
return 0.0
def parse_listing_html(html: str, product_id: str) -> dict:
"""Parse a Mercari listing page into a raw seller dict."""
soup = BeautifulSoup(html, "html.parser")
def _text(testid: str) -> str:
el = soup.find(attrs={"data-testid": testid})
return el.get_text(strip=True) if el else ""
username_raw = _text("ItemDetailsSellerUserName")
username = username_raw.lstrip("@")
num_sales = _safe_int(_text("NumSales"))
rating_count = _safe_int(_text("SellerRatingCount"))
stars = 0.0
rw = soup.find(attrs={"data-testid": "ReviewStarsWrapper"})
if rw:
try:
stars = float(rw.get("data-stars", 0))
except (ValueError, TypeError):
pass
condition = _text("ItemDetailsCondition").lower()
posted_text = _text("ItemDetailsPosted")
listing_age_days = _parse_listing_age(posted_text)
price_text = _text("ItemPrice")
price = 0.0
m = _PRICE_RE.search(price_text.replace(",", ""))
if m:
try:
price = float(m.group())
except ValueError:
pass
return {
"product_id": product_id,
"username": username,
"num_sales": num_sales, # completed sales → maps to feedback_count
"rating_count": rating_count, # number of reviews (additional signal)
"stars": stars, # 0.05.0 → divide by 5 = feedback_ratio
"condition": condition,
"listing_age_days": listing_age_days,
"price": price,
}
def _safe_int(text: str) -> int:
m = _PRICE_RE.search(text.replace(",", ""))
if m:
try:
return int(float(m.group()))
except ValueError:
pass
return 0
def _parse_listing_age(posted_text: str) -> int:
"""Convert a posted date like '04/10/26' to days since posted."""
from datetime import datetime, timezone
m = _POSTED_RE.search(posted_text)
if not m:
return 0
try:
month, day, year = int(m.group(1)), int(m.group(2)), int(m.group(3))
if year < 100:
year += 2000
posted = datetime(year, month, day, tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - posted).days
except (ValueError, OverflowError):
return 0

View file

@ -0,0 +1,64 @@
"""Reproduce the exact FastAPI code path: pool warmup → slot close → _fetch_fresh.
Run inside the container:
docker exec -it snipe-api-1 python /app/snipe/scripts/debug_fetch_fresh.py
"""
import sys, time, threading
sys.path.insert(0, '/app/snipe')
from bs4 import BeautifulSoup
from app.platforms.ebay.browser_pool import BrowserPool, _close_slot
URL = "https://www.mercari.com/search/?keyword=rtx+4090&sortBy=SORT_SCORE&priceMax=800"
print("=== Test 1: _fetch_fresh with no pool (baseline) ===", flush=True)
pool0 = BrowserPool(size=0)
t0 = time.time()
html = pool0._fetch_fresh(URL, wait_for_timeout_ms=8000)
items = BeautifulSoup(html, "html.parser").find_all(attrs={"data-testid": "ItemContainer"})
print(f"Items: {len(items)}, HTML: {len(html)}b, elapsed: {time.time()-t0:.1f}s", flush=True)
print("\n=== Test 2: pool warmup (size=2), grab slot, close it, then _fetch_fresh ===", flush=True)
pool2 = BrowserPool(size=2)
# Warmup in background (blocks until done)
warm_done = threading.Event()
def do_warmup():
pool2.start()
warm_done.set()
t = threading.Thread(target=do_warmup, daemon=True)
t.start()
warm_done.wait(timeout=30)
print(f"Pool size after warmup: {pool2._q.qsize()}", flush=True)
# Grab a slot and close it (simulating the thread-error path)
import queue
try:
slot = pool2._q.get(timeout=3.0)
print(f"Got slot on display :{slot.display_num}", flush=True)
_close_slot(slot)
print("Slot closed", flush=True)
except queue.Empty:
print("Pool empty — no slot to simulate", flush=True)
# Now call _fetch_fresh in this thread (same as FastAPI handler thread)
print("Calling _fetch_fresh from warmup-thread context...", flush=True)
t0 = time.time()
html2 = pool2._fetch_fresh(URL, wait_for_timeout_ms=8000)
items2 = BeautifulSoup(html2, "html.parser").find_all(attrs={"data-testid": "ItemContainer"})
print(f"Items: {len(items2)}, HTML: {len(html2)}b, elapsed: {time.time()-t0:.1f}s", flush=True)
# Save HTML for inspection if empty
if len(items2) == 0:
with open("/tmp/debug_mercari.html", "w") as f:
f.write(html2)
print("Saved HTML to /tmp/debug_mercari.html", flush=True)
title = BeautifulSoup(html2, "html.parser").find("title")
print("Page title:", title.get_text() if title else "(none)", flush=True)
if "Just a moment" in html2 or "turnstile" in html2.lower():
print("BLOCKED: Cloudflare challenge", flush=True)
else:
body = BeautifulSoup(html2, "html.parser").find("body")
if body:
print("Body snippet:", body.get_text(separator=" ", strip=True)[:300], flush=True)

113
scripts/probe_mercari.py Normal file
View file

@ -0,0 +1,113 @@
"""One-shot Mercari probe using the same headed Chromium + Xvfb + stealth stack
as the eBay scraper. Run inside the snipe-api container:
docker exec -it snipe-api-1 python /app/scripts/probe_mercari.py
"""
from __future__ import annotations
import itertools
import os
import subprocess
import sys
import time
_display_counter = itertools.count(200)
_CHROMIUM_ARGS = ["--no-sandbox", "--disable-dev-shm-usage"]
_USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
SEARCH_URL = "https://www.mercari.com/search/?keyword=rtx+4090"
# Give Cloudflare challenge time to resolve (if it does)
WAIT_MS = 8_000
def probe(url: str) -> str:
from playwright.sync_api import sync_playwright
from playwright_stealth import Stealth
display_num = next(_display_counter)
display = f":{display_num}"
env = os.environ.copy()
env["DISPLAY"] = display
xvfb = subprocess.Popen(
["Xvfb", display, "-screen", "0", "1280x800x24"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
time.sleep(0.5)
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(
headless=False,
env=env,
args=_CHROMIUM_ARGS,
)
ctx = browser.new_context(
user_agent=_USER_AGENT,
viewport={"width": 1280, "height": 800},
)
page = ctx.new_page()
Stealth().apply_stealth_sync(page)
print(f"[probe] Navigating to {url}", flush=True)
response = page.goto(url, wait_until="domcontentloaded", timeout=40_000)
print(f"[probe] HTTP status: {response.status if response else 'unknown'}", flush=True)
print(f"[probe] Waiting {WAIT_MS}ms for JS / Turnstile …", flush=True)
page.wait_for_timeout(WAIT_MS)
html = page.content()
title = page.title()
print(f"[probe] Page title: {title!r}", flush=True)
browser.close()
finally:
xvfb.terminate()
xvfb.wait()
return html
def analyse(html: str) -> None:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
# Cloudflare challenge indicators
if "Just a moment" in html or "cf-challenge" in html or "turnstile" in html.lower():
print("[result] BLOCKED — Cloudflare Turnstile still active")
return
print("[result] Cloudflare challenge NOT detected — page appears to have loaded")
# Try to find listing cards
# Mercari US uses data-testid or item cards in the DOM
candidates = [
soup.select("[data-testid='ItemCell']"),
soup.select("[data-testid='item-cell']"),
soup.select("li[data-testid]"),
soup.select(".merList .merListItem"),
soup.select("[class*='ItemCell']"),
soup.select("[class*='item-cell']"),
]
for sel_result in candidates:
if sel_result:
print(f"[result] Found {len(sel_result)} listing card(s) via selector")
card = sel_result[0]
print(f"[result] First card snippet:\n{card.prettify()[:800]}")
return
# Fallback: show body text summary
body = soup.find("body")
text = body.get_text(separator=" ", strip=True)[:500] if body else html[:500]
print(f"[result] No listing cards found. Body text preview:\n{text}")
# Save full HTML for manual inspection
out = "/tmp/mercari_probe.html"
with open(out, "w") as fh:
fh.write(html)
print(f"[result] Full HTML saved to {out}")
if __name__ == "__main__":
html = probe(SEARCH_URL)
analyse(html)

View file

@ -698,7 +698,7 @@ const parsedMustIncludeGroups = computed(() =>
const PLATFORMS: { value: string; label: string; available: boolean }[] = [ const PLATFORMS: { value: string; label: string; available: boolean }[] = [
{ value: 'ebay', label: 'eBay', available: true }, { value: 'ebay', label: 'eBay', available: true },
{ value: 'mercari', label: 'Mercari', available: false }, { value: 'mercari', label: 'Mercari', available: true },
{ value: 'poshmark', label: 'Poshmark', available: false }, { value: 'poshmark', label: 'Poshmark', available: false },
] ]