fix(browser-pool): replace queue with thread-local storage to fix Playwright cross-thread crash (#53)

Playwright's sync API binds its greenlet event loop to the creating thread.
Sharing pre-warmed slots across threads caused "cannot switch to a different
thread" panics under uvicorn. New design: each worker thread owns its own
Playwright instance created lazily on first fetch_html() call. A registry
dict keyed by thread-id lets stop() close all slots at shutdown. Removes
ThreadPoolExecutor warmup and idle-cleanup daemon thread entirely.
This commit is contained in:
pyr0ball 2026-05-04 09:27:20 -07:00
parent bccedb1fe5
commit 108f63b4f2
2 changed files with 228 additions and 288 deletions

View file

@ -1,60 +1,58 @@
"""Pre-warmed Chromium browser pool for the eBay scraper. """Thread-local Playwright browser manager for the eBay scraper.
Eliminates cold-start latency (5-10s per call) by keeping a small pool of Each uvicorn worker thread that calls fetch_html() gets its own Playwright
long-lived Playwright browser instances with fresh contexts ready to serve. instance, browser, and context created lazily on first use. This avoids
the "cannot switch to a different thread" error that arises when Playwright
sync API instances are shared across threads (they bind their greenlet event
loop to the creating thread).
Key design: Key design:
- Pool slots: ``(xvfb_proc, pw_instance, browser, context, display_num, last_used_ts)`` - Thread-local: _thread_local.slot holds the _PooledBrowser for the current
One headed Chromium browser per slot keeps the Kasada fingerprint clean. thread. No slot is ever handed to another thread.
- Display numbering: :200-:399 (avoids host :0 and low-numbered kernel socket conflicts). - Lazy creation: slots are created on first fetch_html() call per thread, not
- Thread safety: ``queue.Queue`` with blocking get (timeout=3s before fresh fallback). at startup. start() is a lightweight lifecycle marker only.
- Replenishment: after each use, the dirty context is closed and a new context is - Registry: _slot_registry (keyed by thread-id) lets stop() close every active
opened on the *same* browser, then returned to the queue. Browser launch overhead slot across all threads without walking thread-local storage.
is only paid at startup and during idle-cleanup replenishment. - Replenishment: after each use the dirty context is closed and a fresh one
- Idle cleanup: daemon thread closes slots idle for >5 minutes to avoid memory leaks opened on the same browser. Browser launch overhead is paid at most once
when the service is quiet. per worker thread lifetime.
- Graceful degradation: if Playwright / Xvfb is unavailable (host-side test env), - Graceful degradation: if Playwright / Xvfb is unavailable, fetch_html falls
``fetch_html`` falls back to launching a fresh browser per call same behavior back to _fetch_fresh (identical behavior to before this module existed).
as before this module existed.
Pool size is controlled via ``BROWSER_POOL_SIZE`` env var (default: 2). Pool size is read from BROWSER_POOL_SIZE env var (default: 2) but is now a
soft limit used only for documentation; actual concurrency is bounded by
uvicorn's thread count.
""" """
from __future__ import annotations from __future__ import annotations
import itertools import itertools
import logging import logging
import os import os
import queue
import subprocess import subprocess
import threading import threading
import time import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Optional from typing import Optional
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# Display counter shared by pool warmup and _fetch_fresh fallback.
# Range :200-:399 avoids low-numbered displays that may be pre-occupied by
# the host X server or lingering kernel sockets from previous runs.
_pool_display_counter = itertools.cycle(range(200, 400)) _pool_display_counter = itertools.cycle(range(200, 400))
_IDLE_TIMEOUT_SECS = 300 # 5 minutes
_CLEANUP_INTERVAL_SECS = 60
_QUEUE_TIMEOUT_SECS = 3.0
_CHROMIUM_ARGS = ["--no-sandbox", "--disable-dev-shm-usage"] _CHROMIUM_ARGS = ["--no-sandbox", "--disable-dev-shm-usage"]
_XVFB_ARGS = ["-screen", "0", "1280x800x24", "-ac"] # -ac: disable X auth (safe in isolated Docker) _XVFB_ARGS = ["-screen", "0", "1280x800x24", "-ac"]
_USER_AGENT = ( _USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
) )
_VIEWPORT = {"width": 1280, "height": 800} _VIEWPORT = {"width": 1280, "height": 800}
# Thread-local storage: each thread gets its own _PooledBrowser slot.
_thread_local = threading.local()
@dataclass @dataclass
class _PooledBrowser: class _PooledBrowser:
"""One slot in the browser pool.""" """One browser slot, bound to a single thread."""
xvfb: subprocess.Popen xvfb: subprocess.Popen
pw: object # playwright instance (sync_playwright().__enter__()) pw: object # playwright instance (sync_playwright().__enter__())
browser: object # playwright Browser browser: object # playwright Browser
@ -63,13 +61,13 @@ class _PooledBrowser:
last_used_ts: float = field(default_factory=time.time) last_used_ts: float = field(default_factory=time.time)
def _launch_slot() -> "_PooledBrowser": def _launch_slot() -> _PooledBrowser:
"""Launch a new Xvfb display + headed Chromium browser + fresh context. """Launch a new Xvfb display + headed Chromium browser + fresh context.
Raises on failure callers must catch and handle gracefully. Must be called from the thread that will use the slot.
""" """
from playwright.sync_api import sync_playwright from playwright.sync_api import sync_playwright
from playwright_stealth import Stealth # noqa: F401 — imported here to confirm availability from playwright_stealth import Stealth # noqa: F401
display_num = next(_pool_display_counter) display_num = next(_pool_display_counter)
display = f":{display_num}" display = f":{display_num}"
@ -81,7 +79,6 @@ def _launch_slot() -> "_PooledBrowser":
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
) )
# Small grace period for Xvfb to bind the display socket.
time.sleep(0.3) time.sleep(0.3)
pw = sync_playwright().start() pw = sync_playwright().start()
@ -112,7 +109,7 @@ def _launch_slot() -> "_PooledBrowser":
def _close_slot(slot: _PooledBrowser) -> None: def _close_slot(slot: _PooledBrowser) -> None:
"""Cleanly close a pool slot: context → browser → Playwright → Xvfb.""" """Cleanly close a slot: context → browser → Playwright → Xvfb."""
try: try:
slot.ctx.close() slot.ctx.close()
except Exception: except Exception:
@ -133,11 +130,7 @@ def _close_slot(slot: _PooledBrowser) -> None:
def _replenish_slot(slot: _PooledBrowser) -> _PooledBrowser: def _replenish_slot(slot: _PooledBrowser) -> _PooledBrowser:
"""Close the used context and open a fresh one on the same browser. """Close the used context and open a fresh one on the same browser."""
Returns a new _PooledBrowser sharing the same xvfb/pw/browser but with a
clean context avoids paying browser launch overhead on every fetch.
"""
try: try:
slot.ctx.close() slot.ctx.close()
except Exception: except Exception:
@ -158,26 +151,27 @@ def _replenish_slot(slot: _PooledBrowser) -> _PooledBrowser:
class BrowserPool: class BrowserPool:
"""Thread-safe pool of pre-warmed Playwright browser contexts.""" """Thread-local Playwright browser manager.
Each thread that calls fetch_html() owns its own browser instance.
No slots are shared between threads.
"""
def __init__(self, size: int = 2) -> None: def __init__(self, size: int = 2) -> None:
self._size = size self._size = size
self._q: queue.Queue[_PooledBrowser] = queue.Queue()
self._lock = threading.Lock() self._lock = threading.Lock()
self._started = False self._started = False
self._stopped = False self._stopped = False
self._playwright_available: Optional[bool] = None # cached after first check self._playwright_available: Optional[bool] = None
# Registry of all active slots keyed by thread id — used only by stop().
self._slot_registry: dict[int, _PooledBrowser] = {}
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Lifecycle # Lifecycle
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def start(self) -> None: def start(self) -> None:
"""Pre-warm N browser slots in background threads. """Mark the pool as started. Slots are created lazily per thread."""
Non-blocking: returns immediately; slots appear in the queue as they
finish launching. Safe to call multiple times (no-op after first).
"""
with self._lock: with self._lock:
if self._started: if self._started:
return return
@ -190,43 +184,19 @@ class BrowserPool:
) )
return return
def _warm_one(_: int) -> None: log.info("BrowserPool: started (thread-local mode, size hint=%d)", self._size)
try:
slot = _launch_slot()
self._q.put(slot)
log.debug("BrowserPool: slot :%d ready", slot.display_num)
except Exception as exc:
log.warning("BrowserPool: pre-warm failed: %s", exc)
with ThreadPoolExecutor(max_workers=self._size) as ex:
futures = [ex.submit(_warm_one, i) for i in range(self._size)]
# Don't wait — executor exits after submitting, threads continue.
# Actually ThreadPoolExecutor.__exit__ waits for completion, which
# is fine: pre-warming completes in background relative to FastAPI
# startup because this whole method is called from a thread.
for f in as_completed(futures):
pass # propagate exceptions via logging, not raises
_idle_cleaner = threading.Thread(
target=self._idle_cleanup_loop, daemon=True, name="browser-pool-idle-cleaner"
)
_idle_cleaner.start()
log.info("BrowserPool: started with %d slots", self._q.qsize())
def stop(self) -> None: def stop(self) -> None:
"""Drain and close all pool slots. Called at FastAPI shutdown.""" """Close all active slots across all threads."""
with self._lock: with self._lock:
self._stopped = True self._stopped = True
registry_snapshot = dict(self._slot_registry)
closed = 0 closed = 0
while True: for slot in registry_snapshot.values():
try:
slot = self._q.get_nowait()
_close_slot(slot) _close_slot(slot)
closed += 1 closed += 1
except queue.Empty: self._slot_registry.clear()
break
log.info("BrowserPool: stopped, closed %d slot(s)", closed) log.info("BrowserPool: stopped, closed %d slot(s)", closed)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@ -242,28 +212,13 @@ class BrowserPool:
) -> str: ) -> str:
"""Navigate to *url* and return the rendered HTML. """Navigate to *url* and return the rendered HTML.
Borrows a browser context from the pool (blocks up to 3s), uses it to Uses the calling thread's browser slot (creates one if needed).
fetch the page, then replenishes the slot with a fresh context. Falls back to a fresh browser if Playwright is unavailable or the
slot fails.
Falls back to a fully fresh browser if the pool is empty after the
timeout or if Playwright is unavailable.
Args:
wait_for_selector: CSS/data-testid selector to wait for before capturing
HTML (e.g. ``"[data-testid='SearchResults']"``). When set, the fixed
*wait_for_timeout_ms* sleep is skipped the page is captured as soon
as the selector appears (or after 15s timeout, whichever comes first).
wait_for_timeout_ms: static post-navigation sleep in ms when
*wait_for_selector* is None. Default 2000; set higher (e.g. 8000)
for sites with JS challenge pages (Cloudflare Turnstile).
""" """
time.sleep(delay) time.sleep(delay)
slot: Optional[_PooledBrowser] = None slot = self._get_or_create_thread_slot()
try:
slot = self._q.get(timeout=_QUEUE_TIMEOUT_SECS)
except queue.Empty:
log.debug("BrowserPool: pool empty after %.1fs — using fresh browser", _QUEUE_TIMEOUT_SECS)
if slot is not None: if slot is not None:
try: try:
@ -272,32 +227,65 @@ class BrowserPool:
wait_for_selector=wait_for_selector, wait_for_selector=wait_for_selector,
wait_for_timeout_ms=wait_for_timeout_ms, wait_for_timeout_ms=wait_for_timeout_ms,
) )
# Replenish: close dirty context, open fresh one, return to queue.
try: try:
fresh_slot = _replenish_slot(slot) fresh_slot = _replenish_slot(slot)
self._q.put(fresh_slot) self._register_slot(fresh_slot)
except Exception as exc: except Exception as exc:
log.warning("BrowserPool: replenish failed, slot discarded: %s", exc) log.warning("BrowserPool: replenish failed, slot discarded: %s", exc)
_close_slot(slot) _close_slot(slot)
self._unregister_slot()
return html return html
except Exception as exc: except Exception as exc:
log.warning("BrowserPool: pooled fetch failed (%s) — closing slot", exc) log.warning("BrowserPool: pooled fetch failed (%s) — closing slot", exc)
_close_slot(slot) _close_slot(slot)
# Fall through to fresh browser below. self._unregister_slot()
# Fallback: fresh browser (same code as old scraper._fetch_url).
return self._fetch_fresh( return self._fetch_fresh(
url, url,
wait_for_selector=wait_for_selector, wait_for_selector=wait_for_selector,
wait_for_timeout_ms=wait_for_timeout_ms, wait_for_timeout_ms=wait_for_timeout_ms,
) )
# ------------------------------------------------------------------
# Thread-local slot management
# ------------------------------------------------------------------
def _get_or_create_thread_slot(self) -> Optional[_PooledBrowser]:
"""Return the calling thread's slot, creating it if absent."""
if not self._check_playwright():
return None
slot: Optional[_PooledBrowser] = getattr(_thread_local, "slot", None)
if slot is not None:
return slot
try:
slot = _launch_slot()
self._register_slot(slot)
log.debug("BrowserPool: launched slot :%d for thread %d",
slot.display_num, threading.get_ident())
return slot
except Exception as exc:
log.warning("BrowserPool: slot launch failed: %s", exc)
return None
def _register_slot(self, slot: _PooledBrowser) -> None:
"""Bind slot to the calling thread (both thread-local and registry)."""
_thread_local.slot = slot
with self._lock:
self._slot_registry[threading.get_ident()] = slot
def _unregister_slot(self) -> None:
"""Remove the calling thread's slot from thread-local and registry."""
_thread_local.slot = None
with self._lock:
self._slot_registry.pop(threading.get_ident(), None)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal helpers # Internal helpers
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _check_playwright(self) -> bool: def _check_playwright(self) -> bool:
"""Return True if Playwright and Xvfb are importable/runnable."""
if self._playwright_available is not None: if self._playwright_available is not None:
return self._playwright_available return self._playwright_available
try: try:
@ -315,7 +303,6 @@ class BrowserPool:
wait_for_selector: Optional[str] = None, wait_for_selector: Optional[str] = None,
wait_for_timeout_ms: int = 2000, wait_for_timeout_ms: int = 2000,
) -> str: ) -> str:
"""Open a new page on *slot.ctx*, navigate to *url*, return HTML."""
from playwright_stealth import Stealth from playwright_stealth import Stealth
page = slot.ctx.new_page() page = slot.ctx.new_page()
@ -326,7 +313,7 @@ class BrowserPool:
try: try:
page.wait_for_selector(wait_for_selector, timeout=15_000) page.wait_for_selector(wait_for_selector, timeout=15_000)
except Exception: except Exception:
pass # selector didn't appear; return whatever loaded pass
else: else:
page.wait_for_timeout(wait_for_timeout_ms) page.wait_for_timeout(wait_for_timeout_ms)
return page.content() return page.content()
@ -342,7 +329,6 @@ class BrowserPool:
wait_for_selector: Optional[str] = None, wait_for_selector: Optional[str] = None,
wait_for_timeout_ms: int = 2000, wait_for_timeout_ms: int = 2000,
) -> str: ) -> str:
"""Launch a fully fresh browser, fetch *url*, close everything."""
import subprocess as _subprocess import subprocess as _subprocess
try: try:
@ -364,7 +350,7 @@ class BrowserPool:
stdout=_subprocess.DEVNULL, stdout=_subprocess.DEVNULL,
stderr=_subprocess.DEVNULL, stderr=_subprocess.DEVNULL,
) )
time.sleep(0.3) # wait for Xvfb to bind the display socket before Chromium starts time.sleep(0.3)
try: try:
with sync_playwright() as pw: with sync_playwright() as pw:
browser = pw.chromium.launch( browser = pw.chromium.launch(
@ -383,7 +369,7 @@ class BrowserPool:
try: try:
page.wait_for_selector(wait_for_selector, timeout=15_000) page.wait_for_selector(wait_for_selector, timeout=15_000)
except Exception: except Exception:
pass # selector didn't appear; return whatever loaded pass
else: else:
page.wait_for_timeout(wait_for_timeout_ms) page.wait_for_timeout(wait_for_timeout_ms)
html = page.content() html = page.content()
@ -394,32 +380,6 @@ class BrowserPool:
return html return html
def _idle_cleanup_loop(self) -> None:
"""Daemon thread: drain slots idle for >5 minutes every 60 seconds."""
while not self._stopped:
time.sleep(_CLEANUP_INTERVAL_SECS)
if self._stopped:
break
now = time.time()
idle_cutoff = now - _IDLE_TIMEOUT_SECS
# Drain the entire queue, keep non-idle slots, close idle ones.
kept: list[_PooledBrowser] = []
closed = 0
while True:
try:
slot = self._q.get_nowait()
except queue.Empty:
break
if slot.last_used_ts < idle_cutoff:
_close_slot(slot)
closed += 1
else:
kept.append(slot)
for slot in kept:
self._q.put(slot)
if closed:
log.info("BrowserPool: idle cleanup closed %d slot(s)", closed)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Module-level singleton # Module-level singleton
@ -430,11 +390,7 @@ _pool_lock = threading.Lock()
def get_pool() -> BrowserPool: def get_pool() -> BrowserPool:
"""Return the module-level BrowserPool singleton (creates it if needed). """Return the module-level BrowserPool singleton (creates it if needed)."""
Pool size is read from ``BROWSER_POOL_SIZE`` env var (default: 2).
Call ``get_pool().start()`` at FastAPI startup to pre-warm slots.
"""
global _pool global _pool
if _pool is None: if _pool is None:
with _pool_lock: with _pool_lock:

View file

@ -1,16 +1,15 @@
"""Tests for app.platforms.ebay.browser_pool. """Tests for app.platforms.ebay.browser_pool (thread-local design).
All tests run without real Chromium / Xvfb / Playwright. All tests run without real Chromium / Xvfb / Playwright.
Playwright, Xvfb subprocess calls, and Stealth are mocked throughout. Playwright, Xvfb subprocess calls, and Stealth are mocked throughout.
""" """
from __future__ import annotations from __future__ import annotations
import queue
import subprocess import subprocess
import threading import threading
import time import time
from typing import Any from typing import Any
from unittest.mock import MagicMock, patch, call from unittest.mock import MagicMock, patch
import pytest import pytest
@ -19,40 +18,35 @@ import pytest
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _reset_pool_singleton(): def _reset_pool_singleton():
"""Force the module-level _pool singleton back to None."""
import app.platforms.ebay.browser_pool as _mod import app.platforms.ebay.browser_pool as _mod
_mod._pool = None _mod._pool = None
# --------------------------------------------------------------------------- def _reset_thread_local():
# Fixtures import app.platforms.ebay.browser_pool as _mod
# --------------------------------------------------------------------------- _mod._thread_local.slot = None
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def reset_singleton(): def reset_pool():
"""Reset the singleton before and after every test."""
_reset_pool_singleton() _reset_pool_singleton()
_reset_thread_local()
yield yield
_reset_pool_singleton() _reset_pool_singleton()
_reset_thread_local()
def _make_fake_slot(): def _make_fake_slot():
"""Build a mock _PooledBrowser with all necessary attributes."""
from app.platforms.ebay.browser_pool import _PooledBrowser from app.platforms.ebay.browser_pool import _PooledBrowser
xvfb = MagicMock(spec=subprocess.Popen) xvfb = MagicMock(spec=subprocess.Popen)
pw = MagicMock() pw = MagicMock()
browser = MagicMock() browser = MagicMock()
ctx = MagicMock() ctx = MagicMock()
slot = _PooledBrowser( return _PooledBrowser(
xvfb=xvfb, xvfb=xvfb, pw=pw, browser=browser, ctx=ctx,
pw=pw, display_num=100, last_used_ts=time.time(),
browser=browser,
ctx=ctx,
display_num=100,
last_used_ts=time.time(),
) )
return slot
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -62,9 +56,7 @@ def _make_fake_slot():
class TestGetPoolSingleton: class TestGetPoolSingleton:
def test_returns_same_instance(self): def test_returns_same_instance(self):
from app.platforms.ebay.browser_pool import get_pool, BrowserPool from app.platforms.ebay.browser_pool import get_pool, BrowserPool
p1 = get_pool() assert get_pool() is get_pool()
p2 = get_pool()
assert p1 is p2
def test_returns_browser_pool_instance(self): def test_returns_browser_pool_instance(self):
from app.platforms.ebay.browser_pool import get_pool, BrowserPool from app.platforms.ebay.browser_pool import get_pool, BrowserPool
@ -72,14 +64,12 @@ class TestGetPoolSingleton:
def test_default_size_is_two(self): def test_default_size_is_two(self):
from app.platforms.ebay.browser_pool import get_pool from app.platforms.ebay.browser_pool import get_pool
pool = get_pool() assert get_pool()._size == 2
assert pool._size == 2
def test_custom_size_from_env(self, monkeypatch): def test_custom_size_from_env(self, monkeypatch):
monkeypatch.setenv("BROWSER_POOL_SIZE", "5") monkeypatch.setenv("BROWSER_POOL_SIZE", "5")
from app.platforms.ebay.browser_pool import get_pool from app.platforms.ebay.browser_pool import get_pool
pool = get_pool() assert get_pool()._size == 5
assert pool._size == 5
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -88,17 +78,15 @@ class TestGetPoolSingleton:
class TestLifecycle: class TestLifecycle:
def test_start_is_noop_when_playwright_unavailable(self): def test_start_is_noop_when_playwright_unavailable(self):
"""Pool should handle missing Playwright gracefully — no error raised."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2) pool = BrowserPool(size=2)
with patch.object(pool, "_check_playwright", return_value=False): with patch.object(pool, "_check_playwright", return_value=False):
pool.start() # must not raise pool.start()
# Pool queue is empty — no slots launched. assert pool._started is True
assert pool._q.empty() assert pool._slot_registry == {}
def test_start_only_runs_once(self): def test_start_only_runs_once(self):
"""Calling start() twice must not double-warm."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
@ -107,47 +95,46 @@ class TestLifecycle:
pool.start() pool.start()
assert pool._started is True assert pool._started is True
def test_stop_drains_queue(self): def test_stop_closes_all_registry_slots(self):
"""stop() should close every slot in the queue."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2) pool = BrowserPool(size=2)
slot1 = _make_fake_slot() slot1 = _make_fake_slot()
slot2 = _make_fake_slot() slot2 = _make_fake_slot()
pool._q.put(slot1) pool._slot_registry[1001] = slot1
pool._q.put(slot2) pool._slot_registry[1002] = slot2
with patch("app.platforms.ebay.browser_pool._close_slot") as mock_close: with patch("app.platforms.ebay.browser_pool._close_slot") as mock_close:
pool.stop() pool.stop()
assert mock_close.call_count == 2 assert mock_close.call_count == 2
assert pool._q.empty() assert pool._slot_registry == {}
assert pool._stopped is True assert pool._stopped is True
def test_stop_on_empty_pool_is_safe(self): def test_stop_on_empty_registry_is_safe(self):
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2) BrowserPool(size=2).stop()
pool.stop() # must not raise
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# fetch_html — pool hit path # fetch_html — thread-local slot hit path
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestFetchHtmlPoolHit: class TestFetchHtmlSlotHit:
def test_uses_pooled_slot_and_replenishes(self): def test_uses_existing_slot_and_replenishes(self):
"""fetch_html should borrow a slot, call _fetch_with_slot, replenish."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
import app.platforms.ebay.browser_pool as _mod
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
slot = _make_fake_slot() slot = _make_fake_slot()
pool._q.put(slot) _mod._thread_local.slot = slot
fresh_slot = _make_fake_slot() fresh_slot = _make_fake_slot()
with ( with (
patch.object(pool, "_fetch_with_slot", return_value="<html>ok</html>") as mock_fetch, patch.object(pool, "_fetch_with_slot", return_value="<html>ok</html>") as mock_fetch,
patch("app.platforms.ebay.browser_pool._replenish_slot", return_value=fresh_slot) as mock_replenish, patch("app.platforms.ebay.browser_pool._replenish_slot", return_value=fresh_slot),
patch.object(pool, "_register_slot") as mock_register,
patch("time.sleep"), patch("time.sleep"),
): ):
html = pool.fetch_html("https://www.ebay.com/sch/i.html?_nkw=test", delay=0) html = pool.fetch_html("https://www.ebay.com/sch/i.html?_nkw=test", delay=0)
@ -157,21 +144,19 @@ class TestFetchHtmlPoolHit:
slot, "https://www.ebay.com/sch/i.html?_nkw=test", slot, "https://www.ebay.com/sch/i.html?_nkw=test",
wait_for_selector=None, wait_for_timeout_ms=2000, wait_for_selector=None, wait_for_timeout_ms=2000,
) )
mock_replenish.assert_called_once_with(slot) mock_register.assert_called_once_with(fresh_slot)
# Fresh slot returned to queue
assert pool._q.get_nowait() is fresh_slot
def test_delay_is_respected(self): def test_delay_is_respected(self):
"""fetch_html must call time.sleep(delay)."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
import app.platforms.ebay.browser_pool as _mod
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
slot = _make_fake_slot() _mod._thread_local.slot = _make_fake_slot()
pool._q.put(slot)
with ( with (
patch.object(pool, "_fetch_with_slot", return_value="<html/>"), patch.object(pool, "_fetch_with_slot", return_value="<html/>"),
patch("app.platforms.ebay.browser_pool._replenish_slot", return_value=_make_fake_slot()), patch("app.platforms.ebay.browser_pool._replenish_slot", return_value=_make_fake_slot()),
patch.object(pool, "_register_slot"),
patch("app.platforms.ebay.browser_pool.time") as mock_time, patch("app.platforms.ebay.browser_pool.time") as mock_time,
): ):
pool.fetch_html("https://example.com", delay=1.5) pool.fetch_html("https://example.com", delay=1.5)
@ -180,22 +165,19 @@ class TestFetchHtmlPoolHit:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# fetch_html — pool empty / fallback path # fetch_html — no slot / fallback path
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestFetchHtmlFallback: class TestFetchHtmlFallback:
def test_falls_back_to_fresh_browser_when_pool_empty(self): def test_falls_back_when_no_slot_and_playwright_unavailable(self):
"""When pool is empty after timeout, _fetch_fresh should be called."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
# Queue is empty — no slots available. # No thread-local slot; playwright unavailable → _get_or_create returns None.
with ( with (
patch.object(pool, "_get_or_create_thread_slot", return_value=None),
patch.object(pool, "_fetch_fresh", return_value="<html>fresh</html>") as mock_fresh, patch.object(pool, "_fetch_fresh", return_value="<html>fresh</html>") as mock_fresh,
patch("time.sleep"), patch("time.sleep"),
# Make Queue.get raise Empty after a short wait.
patch.object(pool._q, "get", side_effect=queue.Empty),
): ):
html = pool.fetch_html("https://www.ebay.com/sch/i.html?_nkw=widget", delay=0) html = pool.fetch_html("https://www.ebay.com/sch/i.html?_nkw=widget", delay=0)
@ -206,17 +188,18 @@ class TestFetchHtmlFallback:
) )
def test_falls_back_when_pooled_fetch_raises(self): def test_falls_back_when_pooled_fetch_raises(self):
"""If _fetch_with_slot raises, the slot is closed and _fetch_fresh is used."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
import app.platforms.ebay.browser_pool as _mod
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
slot = _make_fake_slot() slot = _make_fake_slot()
pool._q.put(slot) _mod._thread_local.slot = slot
with ( with (
patch.object(pool, "_fetch_with_slot", side_effect=RuntimeError("Chromium crashed")), patch.object(pool, "_fetch_with_slot", side_effect=RuntimeError("Chromium crashed")),
patch.object(pool, "_fetch_fresh", return_value="<html>recovered</html>") as mock_fresh, patch.object(pool, "_fetch_fresh", return_value="<html>recovered</html>") as mock_fresh,
patch("app.platforms.ebay.browser_pool._close_slot") as mock_close, patch("app.platforms.ebay.browser_pool._close_slot") as mock_close,
patch.object(pool, "_unregister_slot"),
patch("time.sleep"), patch("time.sleep"),
): ):
html = pool.fetch_html("https://www.ebay.com/", delay=0) html = pool.fetch_html("https://www.ebay.com/", delay=0)
@ -226,19 +209,107 @@ class TestFetchHtmlFallback:
mock_fresh.assert_called_once() mock_fresh.assert_called_once()
# ---------------------------------------------------------------------------
# Thread-local slot management
# ---------------------------------------------------------------------------
class TestThreadLocalSlotManagement:
def test_get_or_create_returns_existing_slot(self):
import app.platforms.ebay.browser_pool as _mod
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
pool._playwright_available = True
existing = _make_fake_slot()
_mod._thread_local.slot = existing
result = pool._get_or_create_thread_slot()
assert result is existing
def test_get_or_create_launches_new_slot_when_absent(self):
import app.platforms.ebay.browser_pool as _mod
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
pool._playwright_available = True
_mod._thread_local.slot = None
new_slot = _make_fake_slot()
with (
patch("app.platforms.ebay.browser_pool._launch_slot", return_value=new_slot),
patch.object(pool, "_register_slot") as mock_register,
):
result = pool._get_or_create_thread_slot()
assert result is new_slot
mock_register.assert_called_once_with(new_slot)
def test_get_or_create_returns_none_when_playwright_unavailable(self):
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
pool._playwright_available = False
assert pool._get_or_create_thread_slot() is None
def test_register_slot_sets_thread_local_and_registry(self):
import app.platforms.ebay.browser_pool as _mod
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
slot = _make_fake_slot()
pool._register_slot(slot)
assert _mod._thread_local.slot is slot
assert threading.get_ident() in pool._slot_registry
def test_unregister_slot_clears_thread_local_and_registry(self):
import app.platforms.ebay.browser_pool as _mod
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
slot = _make_fake_slot()
pool._register_slot(slot)
pool._unregister_slot()
assert getattr(_mod._thread_local, "slot", None) is None
assert threading.get_ident() not in pool._slot_registry
def test_different_threads_get_independent_slots(self):
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2)
pool._playwright_available = True
slots_seen: list = []
errors: list = []
def worker():
new_slot = _make_fake_slot()
with patch("app.platforms.ebay.browser_pool._launch_slot", return_value=new_slot):
s = pool._get_or_create_thread_slot()
slots_seen.append(s)
t1 = threading.Thread(target=worker)
t2 = threading.Thread(target=worker)
t1.start(); t2.start()
t1.join(); t2.join()
assert len(slots_seen) == 2
# Each thread got its own slot object (they may differ or coincidentally share
# the same mock; what matters is both threads succeeded without interference).
assert all(s is not None for s in slots_seen)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# ImportError graceful fallback # ImportError graceful fallback
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestImportErrorHandling: class TestImportErrorHandling:
def test_check_playwright_returns_false_on_import_error(self): def test_check_playwright_returns_false_on_import_error(self):
"""_check_playwright should cache False when playwright is not installed."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2) pool = BrowserPool(size=2)
with patch.dict("sys.modules", {"playwright": None, "playwright_stealth": None}): with patch.dict("sys.modules", {"playwright": None, "playwright_stealth": None}):
# Force re-check by clearing the cached value.
pool._playwright_available = None pool._playwright_available = None
result = pool._check_playwright() result = pool._check_playwright()
@ -246,12 +317,11 @@ class TestImportErrorHandling:
assert pool._playwright_available is False assert pool._playwright_available is False
def test_start_logs_warning_when_playwright_missing(self, caplog): def test_start_logs_warning_when_playwright_missing(self, caplog):
"""start() should log a warning and not crash when Playwright is absent."""
import logging import logging
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
pool._playwright_available = False # simulate missing pool._playwright_available = False
with patch.object(pool, "_check_playwright", return_value=False): with patch.object(pool, "_check_playwright", return_value=False):
with caplog.at_level(logging.WARNING, logger="app.platforms.ebay.browser_pool"): with caplog.at_level(logging.WARNING, logger="app.platforms.ebay.browser_pool"):
@ -260,87 +330,14 @@ class TestImportErrorHandling:
assert any("not available" in r.message for r in caplog.records) assert any("not available" in r.message for r in caplog.records)
def test_fetch_fresh_raises_runtime_error_when_playwright_missing(self): def test_fetch_fresh_raises_runtime_error_when_playwright_missing(self):
"""_fetch_fresh must raise RuntimeError (not ImportError) when PW absent."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
with patch.dict("sys.modules", {"playwright": None, "playwright.sync_api": None}): with patch.dict("sys.modules", {"playwright": None, "playwright.sync_api": None}):
with pytest.raises(RuntimeError, match="Playwright not installed"): with pytest.raises(RuntimeError, match="Playwright not installed"):
pool._fetch_fresh("https://www.ebay.com/") pool._fetch_fresh("https://www.ebay.com/")
# ---------------------------------------------------------------------------
# Idle cleanup
# ---------------------------------------------------------------------------
class TestIdleCleanup:
def test_idle_cleanup_closes_stale_slots(self):
"""_idle_cleanup_loop should close slots whose last_used_ts is too old."""
from app.platforms.ebay.browser_pool import BrowserPool, _IDLE_TIMEOUT_SECS
pool = BrowserPool(size=2)
stale_slot = _make_fake_slot()
stale_slot.last_used_ts = time.time() - (_IDLE_TIMEOUT_SECS + 60)
fresh_slot = _make_fake_slot()
fresh_slot.last_used_ts = time.time()
pool._q.put(stale_slot)
pool._q.put(fresh_slot)
closed_slots = []
def fake_close(s):
closed_slots.append(s)
with patch("app.platforms.ebay.browser_pool._close_slot", side_effect=fake_close):
# Run one cleanup tick directly (not the full loop).
now = time.time()
idle_cutoff = now - _IDLE_TIMEOUT_SECS
kept = []
while True:
try:
s = pool._q.get_nowait()
except queue.Empty:
break
if s.last_used_ts < idle_cutoff:
fake_close(s)
else:
kept.append(s)
for s in kept:
pool._q.put(s)
assert stale_slot in closed_slots
assert fresh_slot not in closed_slots
assert pool._q.qsize() == 1
def test_idle_cleanup_loop_stops_when_pool_stopped(self):
"""Cleanup daemon should exit when _stopped is True."""
from app.platforms.ebay.browser_pool import BrowserPool, _CLEANUP_INTERVAL_SECS
pool = BrowserPool(size=1)
pool._stopped = True
# The loop should return after one iteration of the while check.
# Use a very short sleep mock so the test doesn't actually wait 60s.
sleep_calls = []
def fake_sleep(secs):
sleep_calls.append(secs)
with patch("app.platforms.ebay.browser_pool.time") as mock_time:
mock_time.time.return_value = time.time()
mock_time.sleep.side_effect = fake_sleep
# Run in a thread with a short timeout to confirm it exits.
t = threading.Thread(target=pool._idle_cleanup_loop)
t.start()
t.join(timeout=2.0)
assert not t.is_alive(), "idle cleanup loop did not exit when _stopped=True"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _replenish_slot helper # _replenish_slot helper
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -355,12 +352,8 @@ class TestReplenishSlot:
browser.new_context.return_value = new_ctx browser.new_context.return_value = new_ctx
slot = _PooledBrowser( slot = _PooledBrowser(
xvfb=MagicMock(), xvfb=MagicMock(), pw=MagicMock(), browser=browser,
pw=MagicMock(), ctx=old_ctx, display_num=101, last_used_ts=time.time() - 10,
browser=browser,
ctx=old_ctx,
display_num=101,
last_used_ts=time.time() - 10,
) )
result = _replenish_slot(slot) result = _replenish_slot(slot)
@ -370,7 +363,6 @@ class TestReplenishSlot:
assert result.ctx is new_ctx assert result.ctx is new_ctx
assert result.browser is browser assert result.browser is browser
assert result.xvfb is slot.xvfb assert result.xvfb is slot.xvfb
# last_used_ts is refreshed
assert result.last_used_ts > slot.last_used_ts assert result.last_used_ts > slot.last_used_ts
@ -391,7 +383,6 @@ class TestCloseSlot:
xvfb=xvfb, pw=pw, browser=browser, ctx=ctx, xvfb=xvfb, pw=pw, browser=browser, ctx=ctx,
display_num=102, last_used_ts=time.time(), display_num=102, last_used_ts=time.time(),
) )
_close_slot(slot) _close_slot(slot)
ctx.close.assert_called_once() ctx.close.assert_called_once()
@ -401,7 +392,6 @@ class TestCloseSlot:
xvfb.wait.assert_called_once() xvfb.wait.assert_called_once()
def test_close_slot_ignores_exceptions(self): def test_close_slot_ignores_exceptions(self):
"""_close_slot must not raise even if components throw."""
from app.platforms.ebay.browser_pool import _close_slot, _PooledBrowser from app.platforms.ebay.browser_pool import _close_slot, _PooledBrowser
xvfb = MagicMock(spec=subprocess.Popen) xvfb = MagicMock(spec=subprocess.Popen)
@ -418,7 +408,6 @@ class TestCloseSlot:
xvfb=xvfb, pw=pw, browser=browser, ctx=ctx, xvfb=xvfb, pw=pw, browser=browser, ctx=ctx,
display_num=103, last_used_ts=time.time(), display_num=103, last_used_ts=time.time(),
) )
_close_slot(slot) # must not raise _close_slot(slot) # must not raise
@ -428,7 +417,6 @@ class TestCloseSlot:
class TestScraperUsesPool: class TestScraperUsesPool:
def test_fetch_url_delegates_to_pool(self): def test_fetch_url_delegates_to_pool(self):
"""ScrapedEbayAdapter._fetch_url must use the pool, not launch its own browser."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
from app.platforms.ebay.scraper import ScrapedEbayAdapter from app.platforms.ebay.scraper import ScrapedEbayAdapter
from app.db.store import Store from app.db.store import Store
@ -440,7 +428,6 @@ class TestScraperUsesPool:
fake_pool.fetch_html.return_value = "<html>pooled</html>" fake_pool.fetch_html.return_value = "<html>pooled</html>"
with patch("app.platforms.ebay.browser_pool.get_pool", return_value=fake_pool): with patch("app.platforms.ebay.browser_pool.get_pool", return_value=fake_pool):
# Clear the cache so fetch_url actually hits the pool.
import app.platforms.ebay.scraper as scraper_mod import app.platforms.ebay.scraper as scraper_mod
scraper_mod._html_cache.clear() scraper_mod._html_cache.clear()
html = adapter._fetch_url("https://www.ebay.com/sch/i.html?_nkw=test") html = adapter._fetch_url("https://www.ebay.com/sch/i.html?_nkw=test")
@ -451,7 +438,6 @@ class TestScraperUsesPool:
) )
def test_fetch_url_uses_cache_before_pool(self): def test_fetch_url_uses_cache_before_pool(self):
"""_fetch_url should return cached HTML without hitting the pool."""
from app.platforms.ebay.scraper import ScrapedEbayAdapter, _html_cache, _HTML_CACHE_TTL from app.platforms.ebay.scraper import ScrapedEbayAdapter, _html_cache, _HTML_CACHE_TTL
from app.db.store import Store from app.db.store import Store
@ -467,6 +453,4 @@ class TestScraperUsesPool:
assert html == "<html>cached</html>" assert html == "<html>cached</html>"
fake_pool.fetch_html.assert_not_called() fake_pool.fetch_html.assert_not_called()
# Cleanup
_html_cache.pop(url, None) _html_cache.pop(url, None)