Compare commits

..

2 commits

Author SHA1 Message Date
29d2033ef2 feat: browser pool + search result cache (#47, #48)
Some checks are pending
CI / Python tests (push) Waiting to run
CI / Frontend typecheck + tests (push) Waiting to run
Mirror / mirror (push) Waiting to run
Release / release (push) Waiting to run
2026-04-20 11:57:56 -07:00
a83e0957e2 feat(api): short-TTL search result cache (SEARCH_CACHE_TTL_S=300 default) 2026-04-20 11:53:27 -07:00
2 changed files with 705 additions and 2 deletions

View file

@ -5,12 +5,14 @@ import asyncio
import csv
import dataclasses
import hashlib
import hashlib as _hashlib
import io
import json as _json
import logging
import os
import queue as _queue
import re
import time as _time
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
@ -75,6 +77,61 @@ def _auth_label(user_id: str) -> str:
_update_queues: dict[str, _queue.SimpleQueue] = {}
# ── Short-TTL search result cache ────────────────────────────────────────────
# Caches raw eBay listings and market_price only — trust scores are NOT cached
# because they incorporate per-user signals (zero_feedback cap, etc.).
# On cache hit the trust scorer and seller lookups run against the local DB as
# normal; only the expensive Playwright/Browse API scrape is skipped.
#
# TTL is configurable via SEARCH_CACHE_TTL_S (default 300 s = 5 min).
# Listings are public eBay data — safe to share across all users.
_SEARCH_CACHE_TTL = int(os.environ.get("SEARCH_CACHE_TTL_S", "300"))
# key → ({"listings": [...raw dicts...], "market_price": float|None}, expiry_ts)
_search_result_cache: dict[str, tuple[dict, float]] = {}
# Throttle eviction sweeps to at most once per 60 s.
_last_eviction_ts: float = 0.0
def _cache_key(
q: str,
max_price: "float | None",
min_price: "float | None",
pages: int,
must_include: str,
must_include_mode: str,
must_exclude: str,
category_id: str,
) -> str:
"""Stable 16-char hex key for a search param set. Query is lower-cased + stripped."""
raw = (
f"{q.lower().strip()}|{max_price}|{min_price}|{pages}"
f"|{must_include.lower().strip()}|{must_include_mode}"
f"|{must_exclude.lower().strip()}|{category_id.strip()}"
)
return _hashlib.sha256(raw.encode()).hexdigest()[:16]
def _evict_expired_cache() -> None:
"""Remove stale entries from _search_result_cache.
Called opportunistically on each cache miss; rate-limited to once per 60 s
to avoid quadratic overhead when many concurrent misses arrive at once.
"""
global _last_eviction_ts
now = _time.time()
if now - _last_eviction_ts < 60.0:
return
_last_eviction_ts = now
expired = [k for k, (_, exp) in _search_result_cache.items() if exp <= now]
for k in expired:
_search_result_cache.pop(k, None)
if expired:
log.debug("cache: evicted %d expired entries", len(expired))
# ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ────────
# Holds SnipeCommunityStore at module level so endpoints can publish signals
# without constructing a new connection pool on every request.
@ -97,6 +154,21 @@ def _get_query_translator():
@asynccontextmanager
async def _lifespan(app: FastAPI):
global _community_store
# Pre-warm the Chromium browser pool so the first scrape request does not
# pay the full cold-start cost (5-10s Xvfb + browser launch).
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
import threading as _threading
from app.platforms.ebay.browser_pool import get_pool as _get_browser_pool
_browser_pool = _get_browser_pool()
_pool_thread = _threading.Thread(
target=_browser_pool.start, daemon=True, name="browser-pool-start"
)
_pool_thread.start()
log.info(
"BrowserPool: pre-warm started in background (BROWSER_POOL_SIZE=%s)",
os.environ.get("BROWSER_POOL_SIZE", "2"),
)
# Start vision/LLM background task scheduler.
# background_tasks queue lives in shared_db (cloud) or local_db (local)
# so the scheduler has a single stable DB path across all cloud users.
@ -204,6 +276,13 @@ async def _lifespan(app: FastAPI):
get_scheduler(sched_db).shutdown(timeout=10.0)
reset_scheduler()
log.info("Snipe task scheduler stopped.")
# Drain and close all pre-warmed browser pool slots.
try:
_browser_pool.stop()
except Exception:
log.warning("BrowserPool: error during shutdown", exc_info=True)
if _community_store is not None:
try:
_community_store._db.close()
@ -633,6 +712,7 @@ def search(
must_exclude: str = "", # comma-separated; forwarded to eBay -term + client-side
category_id: str = "", # eBay category ID — forwarded to Browse API / scraper _sacat
adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection
refresh: bool = False, # when True, bypass cache read (still writes fresh result)
session: CloudUser = Depends(get_session),
):
# If the user pasted an eBay listing or checkout URL, extract the item ID
@ -685,6 +765,117 @@ def search(
shared_db = session.shared_db
user_db = session.user_db
# ── Cache lookup (synchronous endpoint) ──────────────────────────────────
cache_key = _cache_key(q, max_price, min_price, pages, must_include, must_include_mode, must_exclude, category_id)
cached_listings_dicts: "list | None" = None
cached_market_price: "float | None" = None
if not refresh:
cached = _search_result_cache.get(cache_key)
if cached is not None:
payload, expiry = cached
if expiry > _time.time():
log.info("cache: hit key=%s q=%r", cache_key, q)
cached_listings_dicts = payload["listings"]
cached_market_price = payload["market_price"]
if cached_listings_dicts is not None:
# Cache hit path: reconstruct listings as plain dicts (already serialised),
# re-run trust scorer against the local DB so per-user signals are fresh,
# and kick off background enrichment as normal.
import sqlite3 as _sqlite3
affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
session_id = str(uuid.uuid4())
_update_queues[session_id] = _queue.SimpleQueue()
try:
shared_store = Store(shared_db)
user_store = Store(user_db)
# Re-hydrate Listing dataclass instances from the cached dicts so the
# scorer and DB calls receive proper typed objects.
from app.db.models import Listing as _Listing
listings = [_Listing(**d) for d in cached_listings_dicts]
# Re-save to user_store so staging fields are current for this session.
user_store.save_listings(listings)
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
listings = [staged.get(l.platform_listing_id, l) for l in listings]
# Fresh trust scores against local DB (not cached — user-specific).
scorer = TrustScorer(shared_store)
trust_scores_list = scorer.score_batch(listings, q)
user_store.save_trust_scores(trust_scores_list)
features = compute_features(session.tier)
if features.photo_analysis:
_enqueue_vision_tasks(listings, trust_scores_list, session)
trust_map = {
listing.platform_listing_id: dataclasses.asdict(ts)
for listing, ts in zip(listings, trust_scores_list)
if ts is not None
}
seller_map = {
listing.seller_platform_id: dataclasses.asdict(
shared_store.get_seller("ebay", listing.seller_platform_id)
)
for listing in listings
if listing.seller_platform_id
and shared_store.get_seller("ebay", listing.seller_platform_id)
}
_is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:")
_pref_store = None if _is_unauthed else user_store
def _get_pref_cached(uid: Optional[str], path: str, default=None):
return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr]
def _serialize_listing_cached(l: object) -> dict:
d = dataclasses.asdict(l)
d["url"] = _wrap_affiliate_url(
d["url"],
retailer="ebay",
user_id=None if _is_unauthed else session.user_id,
get_preference=_get_pref_cached if _pref_store is not None else None,
)
return d
# Kick off BTF enrichment so live score updates still flow.
_trigger_scraper_enrichment(
listings, shared_store, shared_db,
user_db=user_db, query=comp_query, session_id=session_id,
)
return {
"listings": [_serialize_listing_cached(l) for l in listings],
"trust_scores": trust_map,
"sellers": seller_map,
"market_price": cached_market_price,
"adapter_used": adapter_used,
"affiliate_active": affiliate_active,
"session_id": session_id,
}
except _sqlite3.OperationalError as e:
log.warning("search (cache hit) DB contention: %s", e)
_update_queues.pop(session_id, None)
return {
"listings": cached_listings_dicts,
"trust_scores": {},
"sellers": {},
"market_price": cached_market_price,
"adapter_used": adapter_used,
"affiliate_active": affiliate_active,
"session_id": None,
}
# ── Cache miss — run full scrape ─────────────────────────────────────────
_evict_expired_cache()
log.info("cache: miss key=%s q=%r", cache_key, q)
# Each thread creates its own Store — sqlite3 check_same_thread=True.
def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(shared_db), adapter).search(ebay_query, base_filters)
@ -796,6 +987,14 @@ def search(
comp = shared_store.get_market_comp("ebay", query_hash)
market_price = comp.median_price if comp else None
# Store raw listings (as dicts) + market_price in cache.
# Trust scores and seller enrichment are intentionally excluded — they
# incorporate per-user signals and must be computed fresh each time.
_search_result_cache[cache_key] = (
{"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price},
_time.time() + _SEARCH_CACHE_TTL,
)
# Serialize — keyed by platform_listing_id for easy Vue lookup
trust_map = {
listing.platform_listing_id: dataclasses.asdict(ts)
@ -873,6 +1072,7 @@ def search_async(
must_exclude: str = "",
category_id: str = "",
adapter: str = "auto",
refresh: bool = False, # when True, bypass cache read (still writes fresh result)
session: CloudUser = Depends(get_session),
):
"""Async variant of GET /api/search.
@ -923,10 +1123,11 @@ def search_async(
_tier = session.tier
_user_id = session.user_id
_affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
_refresh = refresh # capture before the closure is dispatched
def _background_search() -> None:
"""Run the full search pipeline and push SSE events to the queue."""
import hashlib as _hashlib
import hashlib as _hashlib_local
import sqlite3 as _sqlite3
q_norm = q # captured from outer scope
@ -965,6 +1166,100 @@ def search_async(
if sq is not None:
sq.put(event)
# ── Cache lookup (async background worker) ────────────────────────────
async_cache_key = _cache_key(
q_norm, max_price, min_price, pages,
must_include, must_include_mode, must_exclude, category_id,
)
if not _refresh:
cached = _search_result_cache.get(async_cache_key)
if cached is not None:
payload, expiry = cached
if expiry > _time.time():
log.info("cache: hit key=%s q=%r", async_cache_key, q_norm)
from app.db.models import Listing as _Listing
cached_listings_raw = payload["listings"]
cached_market_price = payload["market_price"]
try:
shared_store = Store(_shared_db)
user_store = Store(_user_db)
listings = [_Listing(**d) for d in cached_listings_raw]
user_store.save_listings(listings)
staged = user_store.get_listings_staged(
"ebay", [l.platform_listing_id for l in listings]
)
listings = [staged.get(l.platform_listing_id, l) for l in listings]
scorer = TrustScorer(shared_store)
trust_scores_list = scorer.score_batch(listings, q_norm)
user_store.save_trust_scores(trust_scores_list)
features_obj = compute_features(_tier)
if features_obj.photo_analysis:
from api.cloud_session import CloudUser as _CloudUser
_sess_stub = _CloudUser(
user_id=_user_id, tier=_tier,
shared_db=_shared_db, user_db=_user_db,
)
_enqueue_vision_tasks(listings, trust_scores_list, _sess_stub)
trust_map = {
listing.platform_listing_id: dataclasses.asdict(ts)
for listing, ts in zip(listings, trust_scores_list)
if ts is not None
}
seller_map = {
listing.seller_platform_id: dataclasses.asdict(
shared_store.get_seller("ebay", listing.seller_platform_id)
)
for listing in listings
if listing.seller_platform_id
and shared_store.get_seller("ebay", listing.seller_platform_id)
}
_is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:")
_pref_store_hit = None if _is_unauthed else user_store
def _get_pref_hit(uid: Optional[str], path: str, default=None):
return _pref_store_hit.get_user_preference(path, default=default) # type: ignore[union-attr]
def _serialize_hit(l: object) -> dict:
d = dataclasses.asdict(l)
d["url"] = _wrap_affiliate_url(
d["url"],
retailer="ebay",
user_id=None if _is_unauthed else _user_id,
get_preference=_get_pref_hit if _pref_store_hit is not None else None,
)
return d
_push({
"type": "listings",
"listings": [_serialize_hit(l) for l in listings],
"trust_scores": trust_map,
"sellers": seller_map,
"market_price": cached_market_price,
"adapter_used": adapter_used,
"affiliate_active": _affiliate_active,
"session_id": session_id,
})
# Enrichment still runs so live score updates flow.
_trigger_scraper_enrichment(
listings, shared_store, _shared_db,
user_db=_user_db, query=comp_query, session_id=session_id,
)
return # done — no scraping needed
except Exception as exc:
log.warning(
"cache hit path failed, falling through to scrape: %s", exc
)
# Fall through to full scrape below.
# ── Cache miss — evict stale entries, then scrape ─────────────────────
_evict_expired_cache()
log.info("cache: miss key=%s q=%r", async_cache_key, q_norm)
try:
def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters)
@ -1038,10 +1333,16 @@ def search_async(
if features_obj.photo_analysis:
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
query_hash = _hashlib.md5(comp_query.encode()).hexdigest()
query_hash = _hashlib_local.md5(comp_query.encode()).hexdigest()
comp = shared_store.get_market_comp("ebay", query_hash)
market_price = comp.median_price if comp else None
# Store raw listings + market_price in cache (trust scores excluded).
_search_result_cache[async_cache_key] = (
{"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price},
_time.time() + _SEARCH_CACHE_TTL,
)
trust_map = {
listing.platform_listing_id: dataclasses.asdict(ts)
for listing, ts in zip(listings, trust_scores_list)

402
tests/test_search_cache.py Normal file
View file

@ -0,0 +1,402 @@
"""Tests for the short-TTL search result cache in api/main.py.
Covers:
- _cache_key stability (same inputs same key)
- _cache_key uniqueness (different inputs different keys)
- cache hit path returns early without scraping (async worker)
- cache miss path stores result in _search_result_cache
- refresh=True bypasses cache read (still writes fresh result)
- TTL expiry: expired entries are not returned as hits
- _evict_expired_cache removes expired entries
"""
from __future__ import annotations
import os
import queue as _queue
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ── Helpers ───────────────────────────────────────────────────────────────────
def _clear_cache():
"""Reset module-level cache state between tests."""
import api.main as _main
_main._search_result_cache.clear()
_main._last_eviction_ts = 0.0
@pytest.fixture(autouse=True)
def isolated_cache():
"""Ensure each test starts with an empty cache."""
_clear_cache()
yield
_clear_cache()
@pytest.fixture
def client(tmp_path):
"""TestClient backed by a fresh tmp DB."""
os.environ["SNIPE_DB"] = str(tmp_path / "snipe.db")
from api.main import app
from fastapi.testclient import TestClient
return TestClient(app, raise_server_exceptions=False)
def _make_mock_listing(listing_id: str = "123456789", seller_id: str = "test_seller"):
"""Return a MagicMock listing (for use where asdict() is NOT called on it)."""
m = MagicMock()
m.platform_listing_id = listing_id
m.seller_platform_id = seller_id
m.title = "Test GPU"
m.price = 100.0
m.currency = "USD"
m.condition = "Used"
m.url = f"https://www.ebay.com/itm/{listing_id}"
m.photo_urls = []
m.listing_age_days = 5
m.buying_format = "fixed_price"
m.ends_at = None
m.fetched_at = None
m.trust_score_id = None
m.id = 1
m.category_name = None
return m
def _make_real_listing(listing_id: str = "123456789", seller_id: str = "test_seller"):
"""Return a real Listing dataclass instance (for use where asdict() is called)."""
from app.db.models import Listing
return Listing(
platform="ebay",
platform_listing_id=listing_id,
title="Test GPU",
price=100.0,
currency="USD",
condition="Used",
seller_platform_id=seller_id,
url=f"https://www.ebay.com/itm/{listing_id}",
photo_urls=[],
listing_age_days=5,
buying_format="fixed_price",
id=None,
)
# ── _cache_key unit tests ─────────────────────────────────────────────────────
def test_cache_key_stable_for_same_inputs():
"""The same parameter set always produces the same key."""
from api.main import _cache_key
k1 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386")
k2 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386")
assert k1 == k2
def test_cache_key_case_normalised():
"""Query is normalised to lower-case + stripped before hashing."""
from api.main import _cache_key
k1 = _cache_key("RTX 3080", None, None, 1, "", "all", "", "")
k2 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
assert k1 == k2
def test_cache_key_differs_on_query_change():
"""Different query strings must produce different keys."""
from api.main import _cache_key
k1 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
k2 = _cache_key("gtx 1080", None, None, 1, "", "all", "", "")
assert k1 != k2
def test_cache_key_differs_on_price_filter():
"""Different max_price must produce a different key."""
from api.main import _cache_key
k1 = _cache_key("gpu", 400.0, None, 1, "", "all", "", "")
k2 = _cache_key("gpu", 500.0, None, 1, "", "all", "", "")
assert k1 != k2
def test_cache_key_differs_on_min_price():
"""Different min_price must produce a different key."""
from api.main import _cache_key
k1 = _cache_key("gpu", None, 50.0, 1, "", "all", "", "")
k2 = _cache_key("gpu", None, 100.0, 1, "", "all", "", "")
assert k1 != k2
def test_cache_key_differs_on_pages():
"""Different page count must produce a different key."""
from api.main import _cache_key
k1 = _cache_key("gpu", None, None, 1, "", "all", "", "")
k2 = _cache_key("gpu", None, None, 2, "", "all", "", "")
assert k1 != k2
def test_cache_key_differs_on_must_include():
"""Different must_include terms must produce a different key."""
from api.main import _cache_key
k1 = _cache_key("gpu", None, None, 1, "rtx", "all", "", "")
k2 = _cache_key("gpu", None, None, 1, "gtx", "all", "", "")
assert k1 != k2
def test_cache_key_differs_on_must_exclude():
"""Different must_exclude terms must produce a different key."""
from api.main import _cache_key
k1 = _cache_key("gpu", None, None, 1, "", "all", "mining", "")
k2 = _cache_key("gpu", None, None, 1, "", "all", "defective", "")
assert k1 != k2
def test_cache_key_differs_on_category_id():
"""Different category_id must produce a different key."""
from api.main import _cache_key
k1 = _cache_key("gpu", None, None, 1, "", "all", "", "27386")
k2 = _cache_key("gpu", None, None, 1, "", "all", "", "12345")
assert k1 != k2
def test_cache_key_is_16_chars():
"""Key must be exactly 16 hex characters."""
from api.main import _cache_key
k = _cache_key("gpu", None, None, 1, "", "all", "", "")
assert len(k) == 16
assert all(c in "0123456789abcdef" for c in k)
# ── TTL / eviction unit tests ─────────────────────────────────────────────────
def test_expired_entry_is_not_returned_as_hit():
"""An entry past its TTL must not be treated as a cache hit."""
import api.main as _main
from api.main import _cache_key
key = _cache_key("gpu", None, None, 1, "", "all", "", "")
# Write an already-expired entry.
_main._search_result_cache[key] = (
{"listings": [], "market_price": None},
time.time() - 1.0, # expired 1 second ago
)
cached = _main._search_result_cache.get(key)
assert cached is not None
payload, expiry = cached
# Simulate the hit-check used in main.py
assert expiry <= time.time(), "Entry should be expired"
def test_evict_expired_cache_removes_stale_entries():
"""_evict_expired_cache must remove entries whose expiry has passed."""
import api.main as _main
from api.main import _cache_key, _evict_expired_cache
key_expired = _cache_key("old query", None, None, 1, "", "all", "", "")
key_valid = _cache_key("new query", None, None, 1, "", "all", "", "")
_main._search_result_cache[key_expired] = (
{"listings": [], "market_price": None},
time.time() - 10.0, # already expired
)
_main._search_result_cache[key_valid] = (
{"listings": [], "market_price": 99.0},
time.time() + 300.0, # valid for 5 min
)
# Reset throttle so eviction runs immediately.
_main._last_eviction_ts = 0.0
_evict_expired_cache()
assert key_expired not in _main._search_result_cache
assert key_valid in _main._search_result_cache
def test_evict_is_rate_limited():
"""_evict_expired_cache should skip eviction if called within 60 s."""
import api.main as _main
from api.main import _cache_key, _evict_expired_cache
key_expired = _cache_key("stale", None, None, 1, "", "all", "", "")
_main._search_result_cache[key_expired] = (
{"listings": [], "market_price": None},
time.time() - 5.0,
)
# Pretend eviction just ran.
_main._last_eviction_ts = time.time()
_evict_expired_cache()
# Entry should still be present because eviction was throttled.
assert key_expired in _main._search_result_cache
# ── Integration tests — async endpoint cache hit ──────────────────────────────
def test_async_cache_hit_skips_scraper(client, tmp_path):
"""On a warm cache hit the scraper adapter must not be called."""
import threading
import api.main as _main
from api.main import _cache_key
# Pre-seed a valid cache entry.
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
_main._search_result_cache[key] = (
{"listings": [], "market_price": 250.0},
time.time() + 300.0,
)
scraper_called = threading.Event()
def _fake_search(query, filters):
scraper_called.set()
return []
with (
patch("api.main._make_adapter") as mock_adapter_factory,
patch("api.main._trigger_scraper_enrichment"),
patch("api.main.TrustScorer") as mock_scorer_cls,
patch("api.main.Store") as mock_store_cls,
):
mock_adapter = MagicMock()
mock_adapter.search.side_effect = _fake_search
mock_adapter.get_completed_sales.return_value = None
mock_adapter_factory.return_value = mock_adapter
mock_scorer = MagicMock()
mock_scorer.score_batch.return_value = []
mock_scorer_cls.return_value = mock_scorer
mock_store = MagicMock()
mock_store.get_listings_staged.return_value = {}
mock_store.refresh_seller_categories.return_value = 0
mock_store.save_listings.return_value = None
mock_store.save_trust_scores.return_value = None
mock_store.get_market_comp.return_value = None
mock_store.get_seller.return_value = None
mock_store.get_user_preference.return_value = None
mock_store_cls.return_value = mock_store
resp = client.get("/api/search/async?q=rtx+3080")
assert resp.status_code == 202
# Give the background worker a moment to run.
scraper_called.wait(timeout=3.0)
# Scraper must NOT have been called on a cache hit.
assert not scraper_called.is_set(), "Scraper was called despite a warm cache hit"
def test_async_cache_miss_stores_result(client, tmp_path):
"""After a cache miss the result must be stored in _search_result_cache."""
import threading
import api.main as _main
from api.main import _cache_key
search_done = threading.Event()
real_listing = _make_real_listing()
def _fake_search(query, filters):
return [real_listing]
with (
patch("api.main._make_adapter") as mock_adapter_factory,
patch("api.main._trigger_scraper_enrichment") as mock_enrich,
patch("api.main.TrustScorer") as mock_scorer_cls,
patch("api.main.Store") as mock_store_cls,
):
mock_adapter = MagicMock()
mock_adapter.search.side_effect = _fake_search
mock_adapter.get_completed_sales.return_value = None
mock_adapter_factory.return_value = mock_adapter
mock_scorer = MagicMock()
mock_scorer.score_batch.return_value = []
mock_scorer_cls.return_value = mock_scorer
mock_store = MagicMock()
mock_store.get_listings_staged.return_value = {
real_listing.platform_listing_id: real_listing
}
mock_store.refresh_seller_categories.return_value = 0
mock_store.save_listings.return_value = None
mock_store.save_trust_scores.return_value = None
mock_store.get_market_comp.return_value = None
mock_store.get_seller.return_value = None
mock_store.get_user_preference.return_value = None
mock_store_cls.return_value = mock_store
def _enrich_side_effect(*args, **kwargs):
search_done.set()
mock_enrich.side_effect = _enrich_side_effect
resp = client.get("/api/search/async?q=rtx+3080")
assert resp.status_code == 202
# Wait until the background worker reaches _trigger_scraper_enrichment.
search_done.wait(timeout=5.0)
assert search_done.is_set(), "Background search worker never completed"
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
assert key in _main._search_result_cache, "Result was not stored in cache after miss"
payload, expiry = _main._search_result_cache[key]
assert expiry > time.time(), "Cache entry has already expired"
assert "listings" in payload
# ── Integration tests — async endpoint refresh=True ──────────────────────────
def test_async_refresh_bypasses_cache_read(client, tmp_path):
"""refresh=True must bypass cache read and invoke the scraper."""
import threading
import api.main as _main
from api.main import _cache_key
# Seed a valid cache entry so we can confirm it is bypassed.
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
_main._search_result_cache[key] = (
{"listings": [], "market_price": 100.0},
time.time() + 300.0,
)
scraper_called = threading.Event()
def _fake_search(query, filters):
scraper_called.set()
return []
with (
patch("api.main._make_adapter") as mock_adapter_factory,
patch("api.main._trigger_scraper_enrichment"),
patch("api.main.TrustScorer") as mock_scorer_cls,
patch("api.main.Store") as mock_store_cls,
):
mock_adapter = MagicMock()
mock_adapter.search.side_effect = _fake_search
mock_adapter.get_completed_sales.return_value = None
mock_adapter_factory.return_value = mock_adapter
mock_scorer = MagicMock()
mock_scorer.score_batch.return_value = []
mock_scorer_cls.return_value = mock_scorer
mock_store = MagicMock()
mock_store.get_listings_staged.return_value = {}
mock_store.refresh_seller_categories.return_value = 0
mock_store.save_listings.return_value = None
mock_store.save_trust_scores.return_value = None
mock_store.get_market_comp.return_value = None
mock_store.get_seller.return_value = None
mock_store.get_user_preference.return_value = None
mock_store_cls.return_value = mock_store
resp = client.get("/api/search/async?q=rtx+3080&refresh=true")
assert resp.status_code == 202
scraper_called.wait(timeout=5.0)
assert scraper_called.is_set(), "Scraper was not called even though refresh=True"