diff --git a/api/main.py b/api/main.py index 3a73a77..05aef6e 100644 --- a/api/main.py +++ b/api/main.py @@ -5,12 +5,14 @@ import asyncio import csv import dataclasses import hashlib +import hashlib as _hashlib import io import json as _json import logging import os import queue as _queue import re +import time as _time import uuid from concurrent.futures import ThreadPoolExecutor from contextlib import asynccontextmanager @@ -75,6 +77,61 @@ def _auth_label(user_id: str) -> str: _update_queues: dict[str, _queue.SimpleQueue] = {} +# ── Short-TTL search result cache ──────────────────────────────────────────── +# Caches raw eBay listings and market_price only — trust scores are NOT cached +# because they incorporate per-user signals (zero_feedback cap, etc.). +# On cache hit the trust scorer and seller lookups run against the local DB as +# normal; only the expensive Playwright/Browse API scrape is skipped. +# +# TTL is configurable via SEARCH_CACHE_TTL_S (default 300 s = 5 min). +# Listings are public eBay data — safe to share across all users. + +_SEARCH_CACHE_TTL = int(os.environ.get("SEARCH_CACHE_TTL_S", "300")) + +# key → ({"listings": [...raw dicts...], "market_price": float|None}, expiry_ts) +_search_result_cache: dict[str, tuple[dict, float]] = {} + +# Throttle eviction sweeps to at most once per 60 s. +_last_eviction_ts: float = 0.0 + + +def _cache_key( + q: str, + max_price: "float | None", + min_price: "float | None", + pages: int, + must_include: str, + must_include_mode: str, + must_exclude: str, + category_id: str, +) -> str: + """Stable 16-char hex key for a search param set. Query is lower-cased + stripped.""" + raw = ( + f"{q.lower().strip()}|{max_price}|{min_price}|{pages}" + f"|{must_include.lower().strip()}|{must_include_mode}" + f"|{must_exclude.lower().strip()}|{category_id.strip()}" + ) + return _hashlib.sha256(raw.encode()).hexdigest()[:16] + + +def _evict_expired_cache() -> None: + """Remove stale entries from _search_result_cache. + + Called opportunistically on each cache miss; rate-limited to once per 60 s + to avoid quadratic overhead when many concurrent misses arrive at once. + """ + global _last_eviction_ts + now = _time.time() + if now - _last_eviction_ts < 60.0: + return + _last_eviction_ts = now + expired = [k for k, (_, exp) in _search_result_cache.items() if exp <= now] + for k in expired: + _search_result_cache.pop(k, None) + if expired: + log.debug("cache: evicted %d expired entries", len(expired)) + + # ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ──────── # Holds SnipeCommunityStore at module level so endpoints can publish signals # without constructing a new connection pool on every request. @@ -97,6 +154,21 @@ def _get_query_translator(): @asynccontextmanager async def _lifespan(app: FastAPI): global _community_store + # Pre-warm the Chromium browser pool so the first scrape request does not + # pay the full cold-start cost (5-10s Xvfb + browser launch). + # Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2). + import threading as _threading + from app.platforms.ebay.browser_pool import get_pool as _get_browser_pool + _browser_pool = _get_browser_pool() + _pool_thread = _threading.Thread( + target=_browser_pool.start, daemon=True, name="browser-pool-start" + ) + _pool_thread.start() + log.info( + "BrowserPool: pre-warm started in background (BROWSER_POOL_SIZE=%s)", + os.environ.get("BROWSER_POOL_SIZE", "2"), + ) + # Start vision/LLM background task scheduler. # background_tasks queue lives in shared_db (cloud) or local_db (local) # so the scheduler has a single stable DB path across all cloud users. @@ -204,6 +276,13 @@ async def _lifespan(app: FastAPI): get_scheduler(sched_db).shutdown(timeout=10.0) reset_scheduler() log.info("Snipe task scheduler stopped.") + + # Drain and close all pre-warmed browser pool slots. + try: + _browser_pool.stop() + except Exception: + log.warning("BrowserPool: error during shutdown", exc_info=True) + if _community_store is not None: try: _community_store._db.close() @@ -633,6 +712,7 @@ def search( must_exclude: str = "", # comma-separated; forwarded to eBay -term + client-side category_id: str = "", # eBay category ID — forwarded to Browse API / scraper _sacat adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection + refresh: bool = False, # when True, bypass cache read (still writes fresh result) session: CloudUser = Depends(get_session), ): # If the user pasted an eBay listing or checkout URL, extract the item ID @@ -685,6 +765,117 @@ def search( shared_db = session.shared_db user_db = session.user_db + # ── Cache lookup (synchronous endpoint) ────────────────────────────────── + cache_key = _cache_key(q, max_price, min_price, pages, must_include, must_include_mode, must_exclude, category_id) + + cached_listings_dicts: "list | None" = None + cached_market_price: "float | None" = None + + if not refresh: + cached = _search_result_cache.get(cache_key) + if cached is not None: + payload, expiry = cached + if expiry > _time.time(): + log.info("cache: hit key=%s q=%r", cache_key, q) + cached_listings_dicts = payload["listings"] + cached_market_price = payload["market_price"] + + if cached_listings_dicts is not None: + # Cache hit path: reconstruct listings as plain dicts (already serialised), + # re-run trust scorer against the local DB so per-user signals are fresh, + # and kick off background enrichment as normal. + import sqlite3 as _sqlite3 + + affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()) + session_id = str(uuid.uuid4()) + _update_queues[session_id] = _queue.SimpleQueue() + + try: + shared_store = Store(shared_db) + user_store = Store(user_db) + + # Re-hydrate Listing dataclass instances from the cached dicts so the + # scorer and DB calls receive proper typed objects. + from app.db.models import Listing as _Listing + listings = [_Listing(**d) for d in cached_listings_dicts] + + # Re-save to user_store so staging fields are current for this session. + user_store.save_listings(listings) + staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings]) + listings = [staged.get(l.platform_listing_id, l) for l in listings] + + # Fresh trust scores against local DB (not cached — user-specific). + scorer = TrustScorer(shared_store) + trust_scores_list = scorer.score_batch(listings, q) + user_store.save_trust_scores(trust_scores_list) + + features = compute_features(session.tier) + if features.photo_analysis: + _enqueue_vision_tasks(listings, trust_scores_list, session) + + trust_map = { + listing.platform_listing_id: dataclasses.asdict(ts) + for listing, ts in zip(listings, trust_scores_list) + if ts is not None + } + seller_map = { + listing.seller_platform_id: dataclasses.asdict( + shared_store.get_seller("ebay", listing.seller_platform_id) + ) + for listing in listings + if listing.seller_platform_id + and shared_store.get_seller("ebay", listing.seller_platform_id) + } + + _is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:") + _pref_store = None if _is_unauthed else user_store + + def _get_pref_cached(uid: Optional[str], path: str, default=None): + return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr] + + def _serialize_listing_cached(l: object) -> dict: + d = dataclasses.asdict(l) + d["url"] = _wrap_affiliate_url( + d["url"], + retailer="ebay", + user_id=None if _is_unauthed else session.user_id, + get_preference=_get_pref_cached if _pref_store is not None else None, + ) + return d + + # Kick off BTF enrichment so live score updates still flow. + _trigger_scraper_enrichment( + listings, shared_store, shared_db, + user_db=user_db, query=comp_query, session_id=session_id, + ) + + return { + "listings": [_serialize_listing_cached(l) for l in listings], + "trust_scores": trust_map, + "sellers": seller_map, + "market_price": cached_market_price, + "adapter_used": adapter_used, + "affiliate_active": affiliate_active, + "session_id": session_id, + } + + except _sqlite3.OperationalError as e: + log.warning("search (cache hit) DB contention: %s", e) + _update_queues.pop(session_id, None) + return { + "listings": cached_listings_dicts, + "trust_scores": {}, + "sellers": {}, + "market_price": cached_market_price, + "adapter_used": adapter_used, + "affiliate_active": affiliate_active, + "session_id": None, + } + + # ── Cache miss — run full scrape ───────────────────────────────────────── + _evict_expired_cache() + log.info("cache: miss key=%s q=%r", cache_key, q) + # Each thread creates its own Store — sqlite3 check_same_thread=True. def _run_search(ebay_query: str) -> list: return _make_adapter(Store(shared_db), adapter).search(ebay_query, base_filters) @@ -796,6 +987,14 @@ def search( comp = shared_store.get_market_comp("ebay", query_hash) market_price = comp.median_price if comp else None + # Store raw listings (as dicts) + market_price in cache. + # Trust scores and seller enrichment are intentionally excluded — they + # incorporate per-user signals and must be computed fresh each time. + _search_result_cache[cache_key] = ( + {"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price}, + _time.time() + _SEARCH_CACHE_TTL, + ) + # Serialize — keyed by platform_listing_id for easy Vue lookup trust_map = { listing.platform_listing_id: dataclasses.asdict(ts) @@ -873,6 +1072,7 @@ def search_async( must_exclude: str = "", category_id: str = "", adapter: str = "auto", + refresh: bool = False, # when True, bypass cache read (still writes fresh result) session: CloudUser = Depends(get_session), ): """Async variant of GET /api/search. @@ -923,10 +1123,11 @@ def search_async( _tier = session.tier _user_id = session.user_id _affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()) + _refresh = refresh # capture before the closure is dispatched def _background_search() -> None: """Run the full search pipeline and push SSE events to the queue.""" - import hashlib as _hashlib + import hashlib as _hashlib_local import sqlite3 as _sqlite3 q_norm = q # captured from outer scope @@ -965,6 +1166,100 @@ def search_async( if sq is not None: sq.put(event) + # ── Cache lookup (async background worker) ──────────────────────────── + async_cache_key = _cache_key( + q_norm, max_price, min_price, pages, + must_include, must_include_mode, must_exclude, category_id, + ) + + if not _refresh: + cached = _search_result_cache.get(async_cache_key) + if cached is not None: + payload, expiry = cached + if expiry > _time.time(): + log.info("cache: hit key=%s q=%r", async_cache_key, q_norm) + from app.db.models import Listing as _Listing + cached_listings_raw = payload["listings"] + cached_market_price = payload["market_price"] + try: + shared_store = Store(_shared_db) + user_store = Store(_user_db) + listings = [_Listing(**d) for d in cached_listings_raw] + user_store.save_listings(listings) + staged = user_store.get_listings_staged( + "ebay", [l.platform_listing_id for l in listings] + ) + listings = [staged.get(l.platform_listing_id, l) for l in listings] + + scorer = TrustScorer(shared_store) + trust_scores_list = scorer.score_batch(listings, q_norm) + user_store.save_trust_scores(trust_scores_list) + + features_obj = compute_features(_tier) + if features_obj.photo_analysis: + from api.cloud_session import CloudUser as _CloudUser + _sess_stub = _CloudUser( + user_id=_user_id, tier=_tier, + shared_db=_shared_db, user_db=_user_db, + ) + _enqueue_vision_tasks(listings, trust_scores_list, _sess_stub) + + trust_map = { + listing.platform_listing_id: dataclasses.asdict(ts) + for listing, ts in zip(listings, trust_scores_list) + if ts is not None + } + seller_map = { + listing.seller_platform_id: dataclasses.asdict( + shared_store.get_seller("ebay", listing.seller_platform_id) + ) + for listing in listings + if listing.seller_platform_id + and shared_store.get_seller("ebay", listing.seller_platform_id) + } + + _is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:") + _pref_store_hit = None if _is_unauthed else user_store + + def _get_pref_hit(uid: Optional[str], path: str, default=None): + return _pref_store_hit.get_user_preference(path, default=default) # type: ignore[union-attr] + + def _serialize_hit(l: object) -> dict: + d = dataclasses.asdict(l) + d["url"] = _wrap_affiliate_url( + d["url"], + retailer="ebay", + user_id=None if _is_unauthed else _user_id, + get_preference=_get_pref_hit if _pref_store_hit is not None else None, + ) + return d + + _push({ + "type": "listings", + "listings": [_serialize_hit(l) for l in listings], + "trust_scores": trust_map, + "sellers": seller_map, + "market_price": cached_market_price, + "adapter_used": adapter_used, + "affiliate_active": _affiliate_active, + "session_id": session_id, + }) + # Enrichment still runs so live score updates flow. + _trigger_scraper_enrichment( + listings, shared_store, _shared_db, + user_db=_user_db, query=comp_query, session_id=session_id, + ) + return # done — no scraping needed + except Exception as exc: + log.warning( + "cache hit path failed, falling through to scrape: %s", exc + ) + # Fall through to full scrape below. + + # ── Cache miss — evict stale entries, then scrape ───────────────────── + _evict_expired_cache() + log.info("cache: miss key=%s q=%r", async_cache_key, q_norm) + try: def _run_search(ebay_query: str) -> list: return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters) @@ -1038,10 +1333,16 @@ def search_async( if features_obj.photo_analysis: _enqueue_vision_tasks(listings, trust_scores_list, _session_stub) - query_hash = _hashlib.md5(comp_query.encode()).hexdigest() + query_hash = _hashlib_local.md5(comp_query.encode()).hexdigest() comp = shared_store.get_market_comp("ebay", query_hash) market_price = comp.median_price if comp else None + # Store raw listings + market_price in cache (trust scores excluded). + _search_result_cache[async_cache_key] = ( + {"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price}, + _time.time() + _SEARCH_CACHE_TTL, + ) + trust_map = { listing.platform_listing_id: dataclasses.asdict(ts) for listing, ts in zip(listings, trust_scores_list) diff --git a/tests/test_search_cache.py b/tests/test_search_cache.py new file mode 100644 index 0000000..07ad48c --- /dev/null +++ b/tests/test_search_cache.py @@ -0,0 +1,402 @@ +"""Tests for the short-TTL search result cache in api/main.py. + +Covers: + - _cache_key stability (same inputs → same key) + - _cache_key uniqueness (different inputs → different keys) + - cache hit path returns early without scraping (async worker) + - cache miss path stores result in _search_result_cache + - refresh=True bypasses cache read (still writes fresh result) + - TTL expiry: expired entries are not returned as hits + - _evict_expired_cache removes expired entries +""" +from __future__ import annotations + +import os +import queue as _queue +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _clear_cache(): + """Reset module-level cache state between tests.""" + import api.main as _main + _main._search_result_cache.clear() + _main._last_eviction_ts = 0.0 + + +@pytest.fixture(autouse=True) +def isolated_cache(): + """Ensure each test starts with an empty cache.""" + _clear_cache() + yield + _clear_cache() + + +@pytest.fixture +def client(tmp_path): + """TestClient backed by a fresh tmp DB.""" + os.environ["SNIPE_DB"] = str(tmp_path / "snipe.db") + from api.main import app + from fastapi.testclient import TestClient + return TestClient(app, raise_server_exceptions=False) + + +def _make_mock_listing(listing_id: str = "123456789", seller_id: str = "test_seller"): + """Return a MagicMock listing (for use where asdict() is NOT called on it).""" + m = MagicMock() + m.platform_listing_id = listing_id + m.seller_platform_id = seller_id + m.title = "Test GPU" + m.price = 100.0 + m.currency = "USD" + m.condition = "Used" + m.url = f"https://www.ebay.com/itm/{listing_id}" + m.photo_urls = [] + m.listing_age_days = 5 + m.buying_format = "fixed_price" + m.ends_at = None + m.fetched_at = None + m.trust_score_id = None + m.id = 1 + m.category_name = None + return m + + +def _make_real_listing(listing_id: str = "123456789", seller_id: str = "test_seller"): + """Return a real Listing dataclass instance (for use where asdict() is called).""" + from app.db.models import Listing + return Listing( + platform="ebay", + platform_listing_id=listing_id, + title="Test GPU", + price=100.0, + currency="USD", + condition="Used", + seller_platform_id=seller_id, + url=f"https://www.ebay.com/itm/{listing_id}", + photo_urls=[], + listing_age_days=5, + buying_format="fixed_price", + id=None, + ) + + +# ── _cache_key unit tests ───────────────────────────────────────────────────── + +def test_cache_key_stable_for_same_inputs(): + """The same parameter set always produces the same key.""" + from api.main import _cache_key + k1 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386") + k2 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386") + assert k1 == k2 + + +def test_cache_key_case_normalised(): + """Query is normalised to lower-case + stripped before hashing.""" + from api.main import _cache_key + k1 = _cache_key("RTX 3080", None, None, 1, "", "all", "", "") + k2 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "") + assert k1 == k2 + + +def test_cache_key_differs_on_query_change(): + """Different query strings must produce different keys.""" + from api.main import _cache_key + k1 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "") + k2 = _cache_key("gtx 1080", None, None, 1, "", "all", "", "") + assert k1 != k2 + + +def test_cache_key_differs_on_price_filter(): + """Different max_price must produce a different key.""" + from api.main import _cache_key + k1 = _cache_key("gpu", 400.0, None, 1, "", "all", "", "") + k2 = _cache_key("gpu", 500.0, None, 1, "", "all", "", "") + assert k1 != k2 + + +def test_cache_key_differs_on_min_price(): + """Different min_price must produce a different key.""" + from api.main import _cache_key + k1 = _cache_key("gpu", None, 50.0, 1, "", "all", "", "") + k2 = _cache_key("gpu", None, 100.0, 1, "", "all", "", "") + assert k1 != k2 + + +def test_cache_key_differs_on_pages(): + """Different page count must produce a different key.""" + from api.main import _cache_key + k1 = _cache_key("gpu", None, None, 1, "", "all", "", "") + k2 = _cache_key("gpu", None, None, 2, "", "all", "", "") + assert k1 != k2 + + +def test_cache_key_differs_on_must_include(): + """Different must_include terms must produce a different key.""" + from api.main import _cache_key + k1 = _cache_key("gpu", None, None, 1, "rtx", "all", "", "") + k2 = _cache_key("gpu", None, None, 1, "gtx", "all", "", "") + assert k1 != k2 + + +def test_cache_key_differs_on_must_exclude(): + """Different must_exclude terms must produce a different key.""" + from api.main import _cache_key + k1 = _cache_key("gpu", None, None, 1, "", "all", "mining", "") + k2 = _cache_key("gpu", None, None, 1, "", "all", "defective", "") + assert k1 != k2 + + +def test_cache_key_differs_on_category_id(): + """Different category_id must produce a different key.""" + from api.main import _cache_key + k1 = _cache_key("gpu", None, None, 1, "", "all", "", "27386") + k2 = _cache_key("gpu", None, None, 1, "", "all", "", "12345") + assert k1 != k2 + + +def test_cache_key_is_16_chars(): + """Key must be exactly 16 hex characters.""" + from api.main import _cache_key + k = _cache_key("gpu", None, None, 1, "", "all", "", "") + assert len(k) == 16 + assert all(c in "0123456789abcdef" for c in k) + + +# ── TTL / eviction unit tests ───────────────────────────────────────────────── + +def test_expired_entry_is_not_returned_as_hit(): + """An entry past its TTL must not be treated as a cache hit.""" + import api.main as _main + from api.main import _cache_key + + key = _cache_key("gpu", None, None, 1, "", "all", "", "") + # Write an already-expired entry. + _main._search_result_cache[key] = ( + {"listings": [], "market_price": None}, + time.time() - 1.0, # expired 1 second ago + ) + + cached = _main._search_result_cache.get(key) + assert cached is not None + payload, expiry = cached + # Simulate the hit-check used in main.py + assert expiry <= time.time(), "Entry should be expired" + + +def test_evict_expired_cache_removes_stale_entries(): + """_evict_expired_cache must remove entries whose expiry has passed.""" + import api.main as _main + from api.main import _cache_key, _evict_expired_cache + + key_expired = _cache_key("old query", None, None, 1, "", "all", "", "") + key_valid = _cache_key("new query", None, None, 1, "", "all", "", "") + + _main._search_result_cache[key_expired] = ( + {"listings": [], "market_price": None}, + time.time() - 10.0, # already expired + ) + _main._search_result_cache[key_valid] = ( + {"listings": [], "market_price": 99.0}, + time.time() + 300.0, # valid for 5 min + ) + + # Reset throttle so eviction runs immediately. + _main._last_eviction_ts = 0.0 + _evict_expired_cache() + + assert key_expired not in _main._search_result_cache + assert key_valid in _main._search_result_cache + + +def test_evict_is_rate_limited(): + """_evict_expired_cache should skip eviction if called within 60 s.""" + import api.main as _main + from api.main import _cache_key, _evict_expired_cache + + key_expired = _cache_key("stale", None, None, 1, "", "all", "", "") + _main._search_result_cache[key_expired] = ( + {"listings": [], "market_price": None}, + time.time() - 5.0, + ) + + # Pretend eviction just ran. + _main._last_eviction_ts = time.time() + _evict_expired_cache() + + # Entry should still be present because eviction was throttled. + assert key_expired in _main._search_result_cache + + +# ── Integration tests — async endpoint cache hit ────────────────────────────── + +def test_async_cache_hit_skips_scraper(client, tmp_path): + """On a warm cache hit the scraper adapter must not be called.""" + import threading + import api.main as _main + from api.main import _cache_key + + # Pre-seed a valid cache entry. + key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "") + _main._search_result_cache[key] = ( + {"listings": [], "market_price": 250.0}, + time.time() + 300.0, + ) + + scraper_called = threading.Event() + + def _fake_search(query, filters): + scraper_called.set() + return [] + + with ( + patch("api.main._make_adapter") as mock_adapter_factory, + patch("api.main._trigger_scraper_enrichment"), + patch("api.main.TrustScorer") as mock_scorer_cls, + patch("api.main.Store") as mock_store_cls, + ): + mock_adapter = MagicMock() + mock_adapter.search.side_effect = _fake_search + mock_adapter.get_completed_sales.return_value = None + mock_adapter_factory.return_value = mock_adapter + + mock_scorer = MagicMock() + mock_scorer.score_batch.return_value = [] + mock_scorer_cls.return_value = mock_scorer + + mock_store = MagicMock() + mock_store.get_listings_staged.return_value = {} + mock_store.refresh_seller_categories.return_value = 0 + mock_store.save_listings.return_value = None + mock_store.save_trust_scores.return_value = None + mock_store.get_market_comp.return_value = None + mock_store.get_seller.return_value = None + mock_store.get_user_preference.return_value = None + mock_store_cls.return_value = mock_store + + resp = client.get("/api/search/async?q=rtx+3080") + assert resp.status_code == 202 + + # Give the background worker a moment to run. + scraper_called.wait(timeout=3.0) + + # Scraper must NOT have been called on a cache hit. + assert not scraper_called.is_set(), "Scraper was called despite a warm cache hit" + + +def test_async_cache_miss_stores_result(client, tmp_path): + """After a cache miss the result must be stored in _search_result_cache.""" + import threading + import api.main as _main + from api.main import _cache_key + + search_done = threading.Event() + real_listing = _make_real_listing() + + def _fake_search(query, filters): + return [real_listing] + + with ( + patch("api.main._make_adapter") as mock_adapter_factory, + patch("api.main._trigger_scraper_enrichment") as mock_enrich, + patch("api.main.TrustScorer") as mock_scorer_cls, + patch("api.main.Store") as mock_store_cls, + ): + mock_adapter = MagicMock() + mock_adapter.search.side_effect = _fake_search + mock_adapter.get_completed_sales.return_value = None + mock_adapter_factory.return_value = mock_adapter + + mock_scorer = MagicMock() + mock_scorer.score_batch.return_value = [] + mock_scorer_cls.return_value = mock_scorer + + mock_store = MagicMock() + mock_store.get_listings_staged.return_value = { + real_listing.platform_listing_id: real_listing + } + mock_store.refresh_seller_categories.return_value = 0 + mock_store.save_listings.return_value = None + mock_store.save_trust_scores.return_value = None + mock_store.get_market_comp.return_value = None + mock_store.get_seller.return_value = None + mock_store.get_user_preference.return_value = None + mock_store_cls.return_value = mock_store + + def _enrich_side_effect(*args, **kwargs): + search_done.set() + + mock_enrich.side_effect = _enrich_side_effect + + resp = client.get("/api/search/async?q=rtx+3080") + assert resp.status_code == 202 + + # Wait until the background worker reaches _trigger_scraper_enrichment. + search_done.wait(timeout=5.0) + + assert search_done.is_set(), "Background search worker never completed" + + key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "") + assert key in _main._search_result_cache, "Result was not stored in cache after miss" + payload, expiry = _main._search_result_cache[key] + assert expiry > time.time(), "Cache entry has already expired" + assert "listings" in payload + + +# ── Integration tests — async endpoint refresh=True ────────────────────────── + +def test_async_refresh_bypasses_cache_read(client, tmp_path): + """refresh=True must bypass cache read and invoke the scraper.""" + import threading + import api.main as _main + from api.main import _cache_key + + # Seed a valid cache entry so we can confirm it is bypassed. + key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "") + _main._search_result_cache[key] = ( + {"listings": [], "market_price": 100.0}, + time.time() + 300.0, + ) + + scraper_called = threading.Event() + + def _fake_search(query, filters): + scraper_called.set() + return [] + + with ( + patch("api.main._make_adapter") as mock_adapter_factory, + patch("api.main._trigger_scraper_enrichment"), + patch("api.main.TrustScorer") as mock_scorer_cls, + patch("api.main.Store") as mock_store_cls, + ): + mock_adapter = MagicMock() + mock_adapter.search.side_effect = _fake_search + mock_adapter.get_completed_sales.return_value = None + mock_adapter_factory.return_value = mock_adapter + + mock_scorer = MagicMock() + mock_scorer.score_batch.return_value = [] + mock_scorer_cls.return_value = mock_scorer + + mock_store = MagicMock() + mock_store.get_listings_staged.return_value = {} + mock_store.refresh_seller_categories.return_value = 0 + mock_store.save_listings.return_value = None + mock_store.save_trust_scores.return_value = None + mock_store.get_market_comp.return_value = None + mock_store.get_seller.return_value = None + mock_store.get_user_preference.return_value = None + mock_store_cls.return_value = mock_store + + resp = client.get("/api/search/async?q=rtx+3080&refresh=true") + assert resp.status_code == 202 + + scraper_called.wait(timeout=5.0) + + assert scraper_called.is_set(), "Scraper was not called even though refresh=True"