Compare commits
2 commits
844721c6fd
...
29d2033ef2
| Author | SHA1 | Date | |
|---|---|---|---|
| 29d2033ef2 | |||
| a83e0957e2 |
2 changed files with 705 additions and 2 deletions
305
api/main.py
305
api/main.py
|
|
@ -5,12 +5,14 @@ import asyncio
|
|||
import csv
|
||||
import dataclasses
|
||||
import hashlib
|
||||
import hashlib as _hashlib
|
||||
import io
|
||||
import json as _json
|
||||
import logging
|
||||
import os
|
||||
import queue as _queue
|
||||
import re
|
||||
import time as _time
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from contextlib import asynccontextmanager
|
||||
|
|
@ -75,6 +77,61 @@ def _auth_label(user_id: str) -> str:
|
|||
_update_queues: dict[str, _queue.SimpleQueue] = {}
|
||||
|
||||
|
||||
# ── Short-TTL search result cache ────────────────────────────────────────────
|
||||
# Caches raw eBay listings and market_price only — trust scores are NOT cached
|
||||
# because they incorporate per-user signals (zero_feedback cap, etc.).
|
||||
# On cache hit the trust scorer and seller lookups run against the local DB as
|
||||
# normal; only the expensive Playwright/Browse API scrape is skipped.
|
||||
#
|
||||
# TTL is configurable via SEARCH_CACHE_TTL_S (default 300 s = 5 min).
|
||||
# Listings are public eBay data — safe to share across all users.
|
||||
|
||||
_SEARCH_CACHE_TTL = int(os.environ.get("SEARCH_CACHE_TTL_S", "300"))
|
||||
|
||||
# key → ({"listings": [...raw dicts...], "market_price": float|None}, expiry_ts)
|
||||
_search_result_cache: dict[str, tuple[dict, float]] = {}
|
||||
|
||||
# Throttle eviction sweeps to at most once per 60 s.
|
||||
_last_eviction_ts: float = 0.0
|
||||
|
||||
|
||||
def _cache_key(
|
||||
q: str,
|
||||
max_price: "float | None",
|
||||
min_price: "float | None",
|
||||
pages: int,
|
||||
must_include: str,
|
||||
must_include_mode: str,
|
||||
must_exclude: str,
|
||||
category_id: str,
|
||||
) -> str:
|
||||
"""Stable 16-char hex key for a search param set. Query is lower-cased + stripped."""
|
||||
raw = (
|
||||
f"{q.lower().strip()}|{max_price}|{min_price}|{pages}"
|
||||
f"|{must_include.lower().strip()}|{must_include_mode}"
|
||||
f"|{must_exclude.lower().strip()}|{category_id.strip()}"
|
||||
)
|
||||
return _hashlib.sha256(raw.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
def _evict_expired_cache() -> None:
|
||||
"""Remove stale entries from _search_result_cache.
|
||||
|
||||
Called opportunistically on each cache miss; rate-limited to once per 60 s
|
||||
to avoid quadratic overhead when many concurrent misses arrive at once.
|
||||
"""
|
||||
global _last_eviction_ts
|
||||
now = _time.time()
|
||||
if now - _last_eviction_ts < 60.0:
|
||||
return
|
||||
_last_eviction_ts = now
|
||||
expired = [k for k, (_, exp) in _search_result_cache.items() if exp <= now]
|
||||
for k in expired:
|
||||
_search_result_cache.pop(k, None)
|
||||
if expired:
|
||||
log.debug("cache: evicted %d expired entries", len(expired))
|
||||
|
||||
|
||||
# ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ────────
|
||||
# Holds SnipeCommunityStore at module level so endpoints can publish signals
|
||||
# without constructing a new connection pool on every request.
|
||||
|
|
@ -97,6 +154,21 @@ def _get_query_translator():
|
|||
@asynccontextmanager
|
||||
async def _lifespan(app: FastAPI):
|
||||
global _community_store
|
||||
# Pre-warm the Chromium browser pool so the first scrape request does not
|
||||
# pay the full cold-start cost (5-10s Xvfb + browser launch).
|
||||
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
|
||||
import threading as _threading
|
||||
from app.platforms.ebay.browser_pool import get_pool as _get_browser_pool
|
||||
_browser_pool = _get_browser_pool()
|
||||
_pool_thread = _threading.Thread(
|
||||
target=_browser_pool.start, daemon=True, name="browser-pool-start"
|
||||
)
|
||||
_pool_thread.start()
|
||||
log.info(
|
||||
"BrowserPool: pre-warm started in background (BROWSER_POOL_SIZE=%s)",
|
||||
os.environ.get("BROWSER_POOL_SIZE", "2"),
|
||||
)
|
||||
|
||||
# Start vision/LLM background task scheduler.
|
||||
# background_tasks queue lives in shared_db (cloud) or local_db (local)
|
||||
# so the scheduler has a single stable DB path across all cloud users.
|
||||
|
|
@ -204,6 +276,13 @@ async def _lifespan(app: FastAPI):
|
|||
get_scheduler(sched_db).shutdown(timeout=10.0)
|
||||
reset_scheduler()
|
||||
log.info("Snipe task scheduler stopped.")
|
||||
|
||||
# Drain and close all pre-warmed browser pool slots.
|
||||
try:
|
||||
_browser_pool.stop()
|
||||
except Exception:
|
||||
log.warning("BrowserPool: error during shutdown", exc_info=True)
|
||||
|
||||
if _community_store is not None:
|
||||
try:
|
||||
_community_store._db.close()
|
||||
|
|
@ -633,6 +712,7 @@ def search(
|
|||
must_exclude: str = "", # comma-separated; forwarded to eBay -term + client-side
|
||||
category_id: str = "", # eBay category ID — forwarded to Browse API / scraper _sacat
|
||||
adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection
|
||||
refresh: bool = False, # when True, bypass cache read (still writes fresh result)
|
||||
session: CloudUser = Depends(get_session),
|
||||
):
|
||||
# If the user pasted an eBay listing or checkout URL, extract the item ID
|
||||
|
|
@ -685,6 +765,117 @@ def search(
|
|||
shared_db = session.shared_db
|
||||
user_db = session.user_db
|
||||
|
||||
# ── Cache lookup (synchronous endpoint) ──────────────────────────────────
|
||||
cache_key = _cache_key(q, max_price, min_price, pages, must_include, must_include_mode, must_exclude, category_id)
|
||||
|
||||
cached_listings_dicts: "list | None" = None
|
||||
cached_market_price: "float | None" = None
|
||||
|
||||
if not refresh:
|
||||
cached = _search_result_cache.get(cache_key)
|
||||
if cached is not None:
|
||||
payload, expiry = cached
|
||||
if expiry > _time.time():
|
||||
log.info("cache: hit key=%s q=%r", cache_key, q)
|
||||
cached_listings_dicts = payload["listings"]
|
||||
cached_market_price = payload["market_price"]
|
||||
|
||||
if cached_listings_dicts is not None:
|
||||
# Cache hit path: reconstruct listings as plain dicts (already serialised),
|
||||
# re-run trust scorer against the local DB so per-user signals are fresh,
|
||||
# and kick off background enrichment as normal.
|
||||
import sqlite3 as _sqlite3
|
||||
|
||||
affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
||||
session_id = str(uuid.uuid4())
|
||||
_update_queues[session_id] = _queue.SimpleQueue()
|
||||
|
||||
try:
|
||||
shared_store = Store(shared_db)
|
||||
user_store = Store(user_db)
|
||||
|
||||
# Re-hydrate Listing dataclass instances from the cached dicts so the
|
||||
# scorer and DB calls receive proper typed objects.
|
||||
from app.db.models import Listing as _Listing
|
||||
listings = [_Listing(**d) for d in cached_listings_dicts]
|
||||
|
||||
# Re-save to user_store so staging fields are current for this session.
|
||||
user_store.save_listings(listings)
|
||||
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
|
||||
listings = [staged.get(l.platform_listing_id, l) for l in listings]
|
||||
|
||||
# Fresh trust scores against local DB (not cached — user-specific).
|
||||
scorer = TrustScorer(shared_store)
|
||||
trust_scores_list = scorer.score_batch(listings, q)
|
||||
user_store.save_trust_scores(trust_scores_list)
|
||||
|
||||
features = compute_features(session.tier)
|
||||
if features.photo_analysis:
|
||||
_enqueue_vision_tasks(listings, trust_scores_list, session)
|
||||
|
||||
trust_map = {
|
||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||
for listing, ts in zip(listings, trust_scores_list)
|
||||
if ts is not None
|
||||
}
|
||||
seller_map = {
|
||||
listing.seller_platform_id: dataclasses.asdict(
|
||||
shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||
)
|
||||
for listing in listings
|
||||
if listing.seller_platform_id
|
||||
and shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||
}
|
||||
|
||||
_is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:")
|
||||
_pref_store = None if _is_unauthed else user_store
|
||||
|
||||
def _get_pref_cached(uid: Optional[str], path: str, default=None):
|
||||
return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr]
|
||||
|
||||
def _serialize_listing_cached(l: object) -> dict:
|
||||
d = dataclasses.asdict(l)
|
||||
d["url"] = _wrap_affiliate_url(
|
||||
d["url"],
|
||||
retailer="ebay",
|
||||
user_id=None if _is_unauthed else session.user_id,
|
||||
get_preference=_get_pref_cached if _pref_store is not None else None,
|
||||
)
|
||||
return d
|
||||
|
||||
# Kick off BTF enrichment so live score updates still flow.
|
||||
_trigger_scraper_enrichment(
|
||||
listings, shared_store, shared_db,
|
||||
user_db=user_db, query=comp_query, session_id=session_id,
|
||||
)
|
||||
|
||||
return {
|
||||
"listings": [_serialize_listing_cached(l) for l in listings],
|
||||
"trust_scores": trust_map,
|
||||
"sellers": seller_map,
|
||||
"market_price": cached_market_price,
|
||||
"adapter_used": adapter_used,
|
||||
"affiliate_active": affiliate_active,
|
||||
"session_id": session_id,
|
||||
}
|
||||
|
||||
except _sqlite3.OperationalError as e:
|
||||
log.warning("search (cache hit) DB contention: %s", e)
|
||||
_update_queues.pop(session_id, None)
|
||||
return {
|
||||
"listings": cached_listings_dicts,
|
||||
"trust_scores": {},
|
||||
"sellers": {},
|
||||
"market_price": cached_market_price,
|
||||
"adapter_used": adapter_used,
|
||||
"affiliate_active": affiliate_active,
|
||||
"session_id": None,
|
||||
}
|
||||
|
||||
# ── Cache miss — run full scrape ─────────────────────────────────────────
|
||||
_evict_expired_cache()
|
||||
log.info("cache: miss key=%s q=%r", cache_key, q)
|
||||
|
||||
# Each thread creates its own Store — sqlite3 check_same_thread=True.
|
||||
def _run_search(ebay_query: str) -> list:
|
||||
return _make_adapter(Store(shared_db), adapter).search(ebay_query, base_filters)
|
||||
|
|
@ -796,6 +987,14 @@ def search(
|
|||
comp = shared_store.get_market_comp("ebay", query_hash)
|
||||
market_price = comp.median_price if comp else None
|
||||
|
||||
# Store raw listings (as dicts) + market_price in cache.
|
||||
# Trust scores and seller enrichment are intentionally excluded — they
|
||||
# incorporate per-user signals and must be computed fresh each time.
|
||||
_search_result_cache[cache_key] = (
|
||||
{"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price},
|
||||
_time.time() + _SEARCH_CACHE_TTL,
|
||||
)
|
||||
|
||||
# Serialize — keyed by platform_listing_id for easy Vue lookup
|
||||
trust_map = {
|
||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||
|
|
@ -873,6 +1072,7 @@ def search_async(
|
|||
must_exclude: str = "",
|
||||
category_id: str = "",
|
||||
adapter: str = "auto",
|
||||
refresh: bool = False, # when True, bypass cache read (still writes fresh result)
|
||||
session: CloudUser = Depends(get_session),
|
||||
):
|
||||
"""Async variant of GET /api/search.
|
||||
|
|
@ -923,10 +1123,11 @@ def search_async(
|
|||
_tier = session.tier
|
||||
_user_id = session.user_id
|
||||
_affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
||||
_refresh = refresh # capture before the closure is dispatched
|
||||
|
||||
def _background_search() -> None:
|
||||
"""Run the full search pipeline and push SSE events to the queue."""
|
||||
import hashlib as _hashlib
|
||||
import hashlib as _hashlib_local
|
||||
import sqlite3 as _sqlite3
|
||||
|
||||
q_norm = q # captured from outer scope
|
||||
|
|
@ -965,6 +1166,100 @@ def search_async(
|
|||
if sq is not None:
|
||||
sq.put(event)
|
||||
|
||||
# ── Cache lookup (async background worker) ────────────────────────────
|
||||
async_cache_key = _cache_key(
|
||||
q_norm, max_price, min_price, pages,
|
||||
must_include, must_include_mode, must_exclude, category_id,
|
||||
)
|
||||
|
||||
if not _refresh:
|
||||
cached = _search_result_cache.get(async_cache_key)
|
||||
if cached is not None:
|
||||
payload, expiry = cached
|
||||
if expiry > _time.time():
|
||||
log.info("cache: hit key=%s q=%r", async_cache_key, q_norm)
|
||||
from app.db.models import Listing as _Listing
|
||||
cached_listings_raw = payload["listings"]
|
||||
cached_market_price = payload["market_price"]
|
||||
try:
|
||||
shared_store = Store(_shared_db)
|
||||
user_store = Store(_user_db)
|
||||
listings = [_Listing(**d) for d in cached_listings_raw]
|
||||
user_store.save_listings(listings)
|
||||
staged = user_store.get_listings_staged(
|
||||
"ebay", [l.platform_listing_id for l in listings]
|
||||
)
|
||||
listings = [staged.get(l.platform_listing_id, l) for l in listings]
|
||||
|
||||
scorer = TrustScorer(shared_store)
|
||||
trust_scores_list = scorer.score_batch(listings, q_norm)
|
||||
user_store.save_trust_scores(trust_scores_list)
|
||||
|
||||
features_obj = compute_features(_tier)
|
||||
if features_obj.photo_analysis:
|
||||
from api.cloud_session import CloudUser as _CloudUser
|
||||
_sess_stub = _CloudUser(
|
||||
user_id=_user_id, tier=_tier,
|
||||
shared_db=_shared_db, user_db=_user_db,
|
||||
)
|
||||
_enqueue_vision_tasks(listings, trust_scores_list, _sess_stub)
|
||||
|
||||
trust_map = {
|
||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||
for listing, ts in zip(listings, trust_scores_list)
|
||||
if ts is not None
|
||||
}
|
||||
seller_map = {
|
||||
listing.seller_platform_id: dataclasses.asdict(
|
||||
shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||
)
|
||||
for listing in listings
|
||||
if listing.seller_platform_id
|
||||
and shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||
}
|
||||
|
||||
_is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:")
|
||||
_pref_store_hit = None if _is_unauthed else user_store
|
||||
|
||||
def _get_pref_hit(uid: Optional[str], path: str, default=None):
|
||||
return _pref_store_hit.get_user_preference(path, default=default) # type: ignore[union-attr]
|
||||
|
||||
def _serialize_hit(l: object) -> dict:
|
||||
d = dataclasses.asdict(l)
|
||||
d["url"] = _wrap_affiliate_url(
|
||||
d["url"],
|
||||
retailer="ebay",
|
||||
user_id=None if _is_unauthed else _user_id,
|
||||
get_preference=_get_pref_hit if _pref_store_hit is not None else None,
|
||||
)
|
||||
return d
|
||||
|
||||
_push({
|
||||
"type": "listings",
|
||||
"listings": [_serialize_hit(l) for l in listings],
|
||||
"trust_scores": trust_map,
|
||||
"sellers": seller_map,
|
||||
"market_price": cached_market_price,
|
||||
"adapter_used": adapter_used,
|
||||
"affiliate_active": _affiliate_active,
|
||||
"session_id": session_id,
|
||||
})
|
||||
# Enrichment still runs so live score updates flow.
|
||||
_trigger_scraper_enrichment(
|
||||
listings, shared_store, _shared_db,
|
||||
user_db=_user_db, query=comp_query, session_id=session_id,
|
||||
)
|
||||
return # done — no scraping needed
|
||||
except Exception as exc:
|
||||
log.warning(
|
||||
"cache hit path failed, falling through to scrape: %s", exc
|
||||
)
|
||||
# Fall through to full scrape below.
|
||||
|
||||
# ── Cache miss — evict stale entries, then scrape ─────────────────────
|
||||
_evict_expired_cache()
|
||||
log.info("cache: miss key=%s q=%r", async_cache_key, q_norm)
|
||||
|
||||
try:
|
||||
def _run_search(ebay_query: str) -> list:
|
||||
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters)
|
||||
|
|
@ -1038,10 +1333,16 @@ def search_async(
|
|||
if features_obj.photo_analysis:
|
||||
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
|
||||
|
||||
query_hash = _hashlib.md5(comp_query.encode()).hexdigest()
|
||||
query_hash = _hashlib_local.md5(comp_query.encode()).hexdigest()
|
||||
comp = shared_store.get_market_comp("ebay", query_hash)
|
||||
market_price = comp.median_price if comp else None
|
||||
|
||||
# Store raw listings + market_price in cache (trust scores excluded).
|
||||
_search_result_cache[async_cache_key] = (
|
||||
{"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price},
|
||||
_time.time() + _SEARCH_CACHE_TTL,
|
||||
)
|
||||
|
||||
trust_map = {
|
||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||
for listing, ts in zip(listings, trust_scores_list)
|
||||
|
|
|
|||
402
tests/test_search_cache.py
Normal file
402
tests/test_search_cache.py
Normal file
|
|
@ -0,0 +1,402 @@
|
|||
"""Tests for the short-TTL search result cache in api/main.py.
|
||||
|
||||
Covers:
|
||||
- _cache_key stability (same inputs → same key)
|
||||
- _cache_key uniqueness (different inputs → different keys)
|
||||
- cache hit path returns early without scraping (async worker)
|
||||
- cache miss path stores result in _search_result_cache
|
||||
- refresh=True bypasses cache read (still writes fresh result)
|
||||
- TTL expiry: expired entries are not returned as hits
|
||||
- _evict_expired_cache removes expired entries
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import queue as _queue
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _clear_cache():
|
||||
"""Reset module-level cache state between tests."""
|
||||
import api.main as _main
|
||||
_main._search_result_cache.clear()
|
||||
_main._last_eviction_ts = 0.0
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def isolated_cache():
|
||||
"""Ensure each test starts with an empty cache."""
|
||||
_clear_cache()
|
||||
yield
|
||||
_clear_cache()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(tmp_path):
|
||||
"""TestClient backed by a fresh tmp DB."""
|
||||
os.environ["SNIPE_DB"] = str(tmp_path / "snipe.db")
|
||||
from api.main import app
|
||||
from fastapi.testclient import TestClient
|
||||
return TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
def _make_mock_listing(listing_id: str = "123456789", seller_id: str = "test_seller"):
|
||||
"""Return a MagicMock listing (for use where asdict() is NOT called on it)."""
|
||||
m = MagicMock()
|
||||
m.platform_listing_id = listing_id
|
||||
m.seller_platform_id = seller_id
|
||||
m.title = "Test GPU"
|
||||
m.price = 100.0
|
||||
m.currency = "USD"
|
||||
m.condition = "Used"
|
||||
m.url = f"https://www.ebay.com/itm/{listing_id}"
|
||||
m.photo_urls = []
|
||||
m.listing_age_days = 5
|
||||
m.buying_format = "fixed_price"
|
||||
m.ends_at = None
|
||||
m.fetched_at = None
|
||||
m.trust_score_id = None
|
||||
m.id = 1
|
||||
m.category_name = None
|
||||
return m
|
||||
|
||||
|
||||
def _make_real_listing(listing_id: str = "123456789", seller_id: str = "test_seller"):
|
||||
"""Return a real Listing dataclass instance (for use where asdict() is called)."""
|
||||
from app.db.models import Listing
|
||||
return Listing(
|
||||
platform="ebay",
|
||||
platform_listing_id=listing_id,
|
||||
title="Test GPU",
|
||||
price=100.0,
|
||||
currency="USD",
|
||||
condition="Used",
|
||||
seller_platform_id=seller_id,
|
||||
url=f"https://www.ebay.com/itm/{listing_id}",
|
||||
photo_urls=[],
|
||||
listing_age_days=5,
|
||||
buying_format="fixed_price",
|
||||
id=None,
|
||||
)
|
||||
|
||||
|
||||
# ── _cache_key unit tests ─────────────────────────────────────────────────────
|
||||
|
||||
def test_cache_key_stable_for_same_inputs():
|
||||
"""The same parameter set always produces the same key."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386")
|
||||
k2 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386")
|
||||
assert k1 == k2
|
||||
|
||||
|
||||
def test_cache_key_case_normalised():
|
||||
"""Query is normalised to lower-case + stripped before hashing."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("RTX 3080", None, None, 1, "", "all", "", "")
|
||||
k2 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||
assert k1 == k2
|
||||
|
||||
|
||||
def test_cache_key_differs_on_query_change():
|
||||
"""Different query strings must produce different keys."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||
k2 = _cache_key("gtx 1080", None, None, 1, "", "all", "", "")
|
||||
assert k1 != k2
|
||||
|
||||
|
||||
def test_cache_key_differs_on_price_filter():
|
||||
"""Different max_price must produce a different key."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("gpu", 400.0, None, 1, "", "all", "", "")
|
||||
k2 = _cache_key("gpu", 500.0, None, 1, "", "all", "", "")
|
||||
assert k1 != k2
|
||||
|
||||
|
||||
def test_cache_key_differs_on_min_price():
|
||||
"""Different min_price must produce a different key."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("gpu", None, 50.0, 1, "", "all", "", "")
|
||||
k2 = _cache_key("gpu", None, 100.0, 1, "", "all", "", "")
|
||||
assert k1 != k2
|
||||
|
||||
|
||||
def test_cache_key_differs_on_pages():
|
||||
"""Different page count must produce a different key."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
||||
k2 = _cache_key("gpu", None, None, 2, "", "all", "", "")
|
||||
assert k1 != k2
|
||||
|
||||
|
||||
def test_cache_key_differs_on_must_include():
|
||||
"""Different must_include terms must produce a different key."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("gpu", None, None, 1, "rtx", "all", "", "")
|
||||
k2 = _cache_key("gpu", None, None, 1, "gtx", "all", "", "")
|
||||
assert k1 != k2
|
||||
|
||||
|
||||
def test_cache_key_differs_on_must_exclude():
|
||||
"""Different must_exclude terms must produce a different key."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("gpu", None, None, 1, "", "all", "mining", "")
|
||||
k2 = _cache_key("gpu", None, None, 1, "", "all", "defective", "")
|
||||
assert k1 != k2
|
||||
|
||||
|
||||
def test_cache_key_differs_on_category_id():
|
||||
"""Different category_id must produce a different key."""
|
||||
from api.main import _cache_key
|
||||
k1 = _cache_key("gpu", None, None, 1, "", "all", "", "27386")
|
||||
k2 = _cache_key("gpu", None, None, 1, "", "all", "", "12345")
|
||||
assert k1 != k2
|
||||
|
||||
|
||||
def test_cache_key_is_16_chars():
|
||||
"""Key must be exactly 16 hex characters."""
|
||||
from api.main import _cache_key
|
||||
k = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
||||
assert len(k) == 16
|
||||
assert all(c in "0123456789abcdef" for c in k)
|
||||
|
||||
|
||||
# ── TTL / eviction unit tests ─────────────────────────────────────────────────
|
||||
|
||||
def test_expired_entry_is_not_returned_as_hit():
|
||||
"""An entry past its TTL must not be treated as a cache hit."""
|
||||
import api.main as _main
|
||||
from api.main import _cache_key
|
||||
|
||||
key = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
||||
# Write an already-expired entry.
|
||||
_main._search_result_cache[key] = (
|
||||
{"listings": [], "market_price": None},
|
||||
time.time() - 1.0, # expired 1 second ago
|
||||
)
|
||||
|
||||
cached = _main._search_result_cache.get(key)
|
||||
assert cached is not None
|
||||
payload, expiry = cached
|
||||
# Simulate the hit-check used in main.py
|
||||
assert expiry <= time.time(), "Entry should be expired"
|
||||
|
||||
|
||||
def test_evict_expired_cache_removes_stale_entries():
|
||||
"""_evict_expired_cache must remove entries whose expiry has passed."""
|
||||
import api.main as _main
|
||||
from api.main import _cache_key, _evict_expired_cache
|
||||
|
||||
key_expired = _cache_key("old query", None, None, 1, "", "all", "", "")
|
||||
key_valid = _cache_key("new query", None, None, 1, "", "all", "", "")
|
||||
|
||||
_main._search_result_cache[key_expired] = (
|
||||
{"listings": [], "market_price": None},
|
||||
time.time() - 10.0, # already expired
|
||||
)
|
||||
_main._search_result_cache[key_valid] = (
|
||||
{"listings": [], "market_price": 99.0},
|
||||
time.time() + 300.0, # valid for 5 min
|
||||
)
|
||||
|
||||
# Reset throttle so eviction runs immediately.
|
||||
_main._last_eviction_ts = 0.0
|
||||
_evict_expired_cache()
|
||||
|
||||
assert key_expired not in _main._search_result_cache
|
||||
assert key_valid in _main._search_result_cache
|
||||
|
||||
|
||||
def test_evict_is_rate_limited():
|
||||
"""_evict_expired_cache should skip eviction if called within 60 s."""
|
||||
import api.main as _main
|
||||
from api.main import _cache_key, _evict_expired_cache
|
||||
|
||||
key_expired = _cache_key("stale", None, None, 1, "", "all", "", "")
|
||||
_main._search_result_cache[key_expired] = (
|
||||
{"listings": [], "market_price": None},
|
||||
time.time() - 5.0,
|
||||
)
|
||||
|
||||
# Pretend eviction just ran.
|
||||
_main._last_eviction_ts = time.time()
|
||||
_evict_expired_cache()
|
||||
|
||||
# Entry should still be present because eviction was throttled.
|
||||
assert key_expired in _main._search_result_cache
|
||||
|
||||
|
||||
# ── Integration tests — async endpoint cache hit ──────────────────────────────
|
||||
|
||||
def test_async_cache_hit_skips_scraper(client, tmp_path):
|
||||
"""On a warm cache hit the scraper adapter must not be called."""
|
||||
import threading
|
||||
import api.main as _main
|
||||
from api.main import _cache_key
|
||||
|
||||
# Pre-seed a valid cache entry.
|
||||
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||
_main._search_result_cache[key] = (
|
||||
{"listings": [], "market_price": 250.0},
|
||||
time.time() + 300.0,
|
||||
)
|
||||
|
||||
scraper_called = threading.Event()
|
||||
|
||||
def _fake_search(query, filters):
|
||||
scraper_called.set()
|
||||
return []
|
||||
|
||||
with (
|
||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||
patch("api.main._trigger_scraper_enrichment"),
|
||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||
patch("api.main.Store") as mock_store_cls,
|
||||
):
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.search.side_effect = _fake_search
|
||||
mock_adapter.get_completed_sales.return_value = None
|
||||
mock_adapter_factory.return_value = mock_adapter
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score_batch.return_value = []
|
||||
mock_scorer_cls.return_value = mock_scorer
|
||||
|
||||
mock_store = MagicMock()
|
||||
mock_store.get_listings_staged.return_value = {}
|
||||
mock_store.refresh_seller_categories.return_value = 0
|
||||
mock_store.save_listings.return_value = None
|
||||
mock_store.save_trust_scores.return_value = None
|
||||
mock_store.get_market_comp.return_value = None
|
||||
mock_store.get_seller.return_value = None
|
||||
mock_store.get_user_preference.return_value = None
|
||||
mock_store_cls.return_value = mock_store
|
||||
|
||||
resp = client.get("/api/search/async?q=rtx+3080")
|
||||
assert resp.status_code == 202
|
||||
|
||||
# Give the background worker a moment to run.
|
||||
scraper_called.wait(timeout=3.0)
|
||||
|
||||
# Scraper must NOT have been called on a cache hit.
|
||||
assert not scraper_called.is_set(), "Scraper was called despite a warm cache hit"
|
||||
|
||||
|
||||
def test_async_cache_miss_stores_result(client, tmp_path):
|
||||
"""After a cache miss the result must be stored in _search_result_cache."""
|
||||
import threading
|
||||
import api.main as _main
|
||||
from api.main import _cache_key
|
||||
|
||||
search_done = threading.Event()
|
||||
real_listing = _make_real_listing()
|
||||
|
||||
def _fake_search(query, filters):
|
||||
return [real_listing]
|
||||
|
||||
with (
|
||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||
patch("api.main._trigger_scraper_enrichment") as mock_enrich,
|
||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||
patch("api.main.Store") as mock_store_cls,
|
||||
):
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.search.side_effect = _fake_search
|
||||
mock_adapter.get_completed_sales.return_value = None
|
||||
mock_adapter_factory.return_value = mock_adapter
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score_batch.return_value = []
|
||||
mock_scorer_cls.return_value = mock_scorer
|
||||
|
||||
mock_store = MagicMock()
|
||||
mock_store.get_listings_staged.return_value = {
|
||||
real_listing.platform_listing_id: real_listing
|
||||
}
|
||||
mock_store.refresh_seller_categories.return_value = 0
|
||||
mock_store.save_listings.return_value = None
|
||||
mock_store.save_trust_scores.return_value = None
|
||||
mock_store.get_market_comp.return_value = None
|
||||
mock_store.get_seller.return_value = None
|
||||
mock_store.get_user_preference.return_value = None
|
||||
mock_store_cls.return_value = mock_store
|
||||
|
||||
def _enrich_side_effect(*args, **kwargs):
|
||||
search_done.set()
|
||||
|
||||
mock_enrich.side_effect = _enrich_side_effect
|
||||
|
||||
resp = client.get("/api/search/async?q=rtx+3080")
|
||||
assert resp.status_code == 202
|
||||
|
||||
# Wait until the background worker reaches _trigger_scraper_enrichment.
|
||||
search_done.wait(timeout=5.0)
|
||||
|
||||
assert search_done.is_set(), "Background search worker never completed"
|
||||
|
||||
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||
assert key in _main._search_result_cache, "Result was not stored in cache after miss"
|
||||
payload, expiry = _main._search_result_cache[key]
|
||||
assert expiry > time.time(), "Cache entry has already expired"
|
||||
assert "listings" in payload
|
||||
|
||||
|
||||
# ── Integration tests — async endpoint refresh=True ──────────────────────────
|
||||
|
||||
def test_async_refresh_bypasses_cache_read(client, tmp_path):
|
||||
"""refresh=True must bypass cache read and invoke the scraper."""
|
||||
import threading
|
||||
import api.main as _main
|
||||
from api.main import _cache_key
|
||||
|
||||
# Seed a valid cache entry so we can confirm it is bypassed.
|
||||
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||
_main._search_result_cache[key] = (
|
||||
{"listings": [], "market_price": 100.0},
|
||||
time.time() + 300.0,
|
||||
)
|
||||
|
||||
scraper_called = threading.Event()
|
||||
|
||||
def _fake_search(query, filters):
|
||||
scraper_called.set()
|
||||
return []
|
||||
|
||||
with (
|
||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||
patch("api.main._trigger_scraper_enrichment"),
|
||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||
patch("api.main.Store") as mock_store_cls,
|
||||
):
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.search.side_effect = _fake_search
|
||||
mock_adapter.get_completed_sales.return_value = None
|
||||
mock_adapter_factory.return_value = mock_adapter
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score_batch.return_value = []
|
||||
mock_scorer_cls.return_value = mock_scorer
|
||||
|
||||
mock_store = MagicMock()
|
||||
mock_store.get_listings_staged.return_value = {}
|
||||
mock_store.refresh_seller_categories.return_value = 0
|
||||
mock_store.save_listings.return_value = None
|
||||
mock_store.save_trust_scores.return_value = None
|
||||
mock_store.get_market_comp.return_value = None
|
||||
mock_store.get_seller.return_value = None
|
||||
mock_store.get_user_preference.return_value = None
|
||||
mock_store_cls.return_value = mock_store
|
||||
|
||||
resp = client.get("/api/search/async?q=rtx+3080&refresh=true")
|
||||
assert resp.status_code == 202
|
||||
|
||||
scraper_called.wait(timeout=5.0)
|
||||
|
||||
assert scraper_called.is_set(), "Scraper was not called even though refresh=True"
|
||||
Loading…
Reference in a new issue