commit
29d2033ef2
2 changed files with 705 additions and 2 deletions
305
api/main.py
305
api/main.py
|
|
@ -5,12 +5,14 @@ import asyncio
|
||||||
import csv
|
import csv
|
||||||
import dataclasses
|
import dataclasses
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import hashlib as _hashlib
|
||||||
import io
|
import io
|
||||||
import json as _json
|
import json as _json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import queue as _queue
|
import queue as _queue
|
||||||
import re
|
import re
|
||||||
|
import time as _time
|
||||||
import uuid
|
import uuid
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
@ -75,6 +77,61 @@ def _auth_label(user_id: str) -> str:
|
||||||
_update_queues: dict[str, _queue.SimpleQueue] = {}
|
_update_queues: dict[str, _queue.SimpleQueue] = {}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Short-TTL search result cache ────────────────────────────────────────────
|
||||||
|
# Caches raw eBay listings and market_price only — trust scores are NOT cached
|
||||||
|
# because they incorporate per-user signals (zero_feedback cap, etc.).
|
||||||
|
# On cache hit the trust scorer and seller lookups run against the local DB as
|
||||||
|
# normal; only the expensive Playwright/Browse API scrape is skipped.
|
||||||
|
#
|
||||||
|
# TTL is configurable via SEARCH_CACHE_TTL_S (default 300 s = 5 min).
|
||||||
|
# Listings are public eBay data — safe to share across all users.
|
||||||
|
|
||||||
|
_SEARCH_CACHE_TTL = int(os.environ.get("SEARCH_CACHE_TTL_S", "300"))
|
||||||
|
|
||||||
|
# key → ({"listings": [...raw dicts...], "market_price": float|None}, expiry_ts)
|
||||||
|
_search_result_cache: dict[str, tuple[dict, float]] = {}
|
||||||
|
|
||||||
|
# Throttle eviction sweeps to at most once per 60 s.
|
||||||
|
_last_eviction_ts: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def _cache_key(
|
||||||
|
q: str,
|
||||||
|
max_price: "float | None",
|
||||||
|
min_price: "float | None",
|
||||||
|
pages: int,
|
||||||
|
must_include: str,
|
||||||
|
must_include_mode: str,
|
||||||
|
must_exclude: str,
|
||||||
|
category_id: str,
|
||||||
|
) -> str:
|
||||||
|
"""Stable 16-char hex key for a search param set. Query is lower-cased + stripped."""
|
||||||
|
raw = (
|
||||||
|
f"{q.lower().strip()}|{max_price}|{min_price}|{pages}"
|
||||||
|
f"|{must_include.lower().strip()}|{must_include_mode}"
|
||||||
|
f"|{must_exclude.lower().strip()}|{category_id.strip()}"
|
||||||
|
)
|
||||||
|
return _hashlib.sha256(raw.encode()).hexdigest()[:16]
|
||||||
|
|
||||||
|
|
||||||
|
def _evict_expired_cache() -> None:
|
||||||
|
"""Remove stale entries from _search_result_cache.
|
||||||
|
|
||||||
|
Called opportunistically on each cache miss; rate-limited to once per 60 s
|
||||||
|
to avoid quadratic overhead when many concurrent misses arrive at once.
|
||||||
|
"""
|
||||||
|
global _last_eviction_ts
|
||||||
|
now = _time.time()
|
||||||
|
if now - _last_eviction_ts < 60.0:
|
||||||
|
return
|
||||||
|
_last_eviction_ts = now
|
||||||
|
expired = [k for k, (_, exp) in _search_result_cache.items() if exp <= now]
|
||||||
|
for k in expired:
|
||||||
|
_search_result_cache.pop(k, None)
|
||||||
|
if expired:
|
||||||
|
log.debug("cache: evicted %d expired entries", len(expired))
|
||||||
|
|
||||||
|
|
||||||
# ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ────────
|
# ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ────────
|
||||||
# Holds SnipeCommunityStore at module level so endpoints can publish signals
|
# Holds SnipeCommunityStore at module level so endpoints can publish signals
|
||||||
# without constructing a new connection pool on every request.
|
# without constructing a new connection pool on every request.
|
||||||
|
|
@ -97,6 +154,21 @@ def _get_query_translator():
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _lifespan(app: FastAPI):
|
async def _lifespan(app: FastAPI):
|
||||||
global _community_store
|
global _community_store
|
||||||
|
# Pre-warm the Chromium browser pool so the first scrape request does not
|
||||||
|
# pay the full cold-start cost (5-10s Xvfb + browser launch).
|
||||||
|
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
|
||||||
|
import threading as _threading
|
||||||
|
from app.platforms.ebay.browser_pool import get_pool as _get_browser_pool
|
||||||
|
_browser_pool = _get_browser_pool()
|
||||||
|
_pool_thread = _threading.Thread(
|
||||||
|
target=_browser_pool.start, daemon=True, name="browser-pool-start"
|
||||||
|
)
|
||||||
|
_pool_thread.start()
|
||||||
|
log.info(
|
||||||
|
"BrowserPool: pre-warm started in background (BROWSER_POOL_SIZE=%s)",
|
||||||
|
os.environ.get("BROWSER_POOL_SIZE", "2"),
|
||||||
|
)
|
||||||
|
|
||||||
# Start vision/LLM background task scheduler.
|
# Start vision/LLM background task scheduler.
|
||||||
# background_tasks queue lives in shared_db (cloud) or local_db (local)
|
# background_tasks queue lives in shared_db (cloud) or local_db (local)
|
||||||
# so the scheduler has a single stable DB path across all cloud users.
|
# so the scheduler has a single stable DB path across all cloud users.
|
||||||
|
|
@ -204,6 +276,13 @@ async def _lifespan(app: FastAPI):
|
||||||
get_scheduler(sched_db).shutdown(timeout=10.0)
|
get_scheduler(sched_db).shutdown(timeout=10.0)
|
||||||
reset_scheduler()
|
reset_scheduler()
|
||||||
log.info("Snipe task scheduler stopped.")
|
log.info("Snipe task scheduler stopped.")
|
||||||
|
|
||||||
|
# Drain and close all pre-warmed browser pool slots.
|
||||||
|
try:
|
||||||
|
_browser_pool.stop()
|
||||||
|
except Exception:
|
||||||
|
log.warning("BrowserPool: error during shutdown", exc_info=True)
|
||||||
|
|
||||||
if _community_store is not None:
|
if _community_store is not None:
|
||||||
try:
|
try:
|
||||||
_community_store._db.close()
|
_community_store._db.close()
|
||||||
|
|
@ -633,6 +712,7 @@ def search(
|
||||||
must_exclude: str = "", # comma-separated; forwarded to eBay -term + client-side
|
must_exclude: str = "", # comma-separated; forwarded to eBay -term + client-side
|
||||||
category_id: str = "", # eBay category ID — forwarded to Browse API / scraper _sacat
|
category_id: str = "", # eBay category ID — forwarded to Browse API / scraper _sacat
|
||||||
adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection
|
adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection
|
||||||
|
refresh: bool = False, # when True, bypass cache read (still writes fresh result)
|
||||||
session: CloudUser = Depends(get_session),
|
session: CloudUser = Depends(get_session),
|
||||||
):
|
):
|
||||||
# If the user pasted an eBay listing or checkout URL, extract the item ID
|
# If the user pasted an eBay listing or checkout URL, extract the item ID
|
||||||
|
|
@ -685,6 +765,117 @@ def search(
|
||||||
shared_db = session.shared_db
|
shared_db = session.shared_db
|
||||||
user_db = session.user_db
|
user_db = session.user_db
|
||||||
|
|
||||||
|
# ── Cache lookup (synchronous endpoint) ──────────────────────────────────
|
||||||
|
cache_key = _cache_key(q, max_price, min_price, pages, must_include, must_include_mode, must_exclude, category_id)
|
||||||
|
|
||||||
|
cached_listings_dicts: "list | None" = None
|
||||||
|
cached_market_price: "float | None" = None
|
||||||
|
|
||||||
|
if not refresh:
|
||||||
|
cached = _search_result_cache.get(cache_key)
|
||||||
|
if cached is not None:
|
||||||
|
payload, expiry = cached
|
||||||
|
if expiry > _time.time():
|
||||||
|
log.info("cache: hit key=%s q=%r", cache_key, q)
|
||||||
|
cached_listings_dicts = payload["listings"]
|
||||||
|
cached_market_price = payload["market_price"]
|
||||||
|
|
||||||
|
if cached_listings_dicts is not None:
|
||||||
|
# Cache hit path: reconstruct listings as plain dicts (already serialised),
|
||||||
|
# re-run trust scorer against the local DB so per-user signals are fresh,
|
||||||
|
# and kick off background enrichment as normal.
|
||||||
|
import sqlite3 as _sqlite3
|
||||||
|
|
||||||
|
affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
||||||
|
session_id = str(uuid.uuid4())
|
||||||
|
_update_queues[session_id] = _queue.SimpleQueue()
|
||||||
|
|
||||||
|
try:
|
||||||
|
shared_store = Store(shared_db)
|
||||||
|
user_store = Store(user_db)
|
||||||
|
|
||||||
|
# Re-hydrate Listing dataclass instances from the cached dicts so the
|
||||||
|
# scorer and DB calls receive proper typed objects.
|
||||||
|
from app.db.models import Listing as _Listing
|
||||||
|
listings = [_Listing(**d) for d in cached_listings_dicts]
|
||||||
|
|
||||||
|
# Re-save to user_store so staging fields are current for this session.
|
||||||
|
user_store.save_listings(listings)
|
||||||
|
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
|
||||||
|
listings = [staged.get(l.platform_listing_id, l) for l in listings]
|
||||||
|
|
||||||
|
# Fresh trust scores against local DB (not cached — user-specific).
|
||||||
|
scorer = TrustScorer(shared_store)
|
||||||
|
trust_scores_list = scorer.score_batch(listings, q)
|
||||||
|
user_store.save_trust_scores(trust_scores_list)
|
||||||
|
|
||||||
|
features = compute_features(session.tier)
|
||||||
|
if features.photo_analysis:
|
||||||
|
_enqueue_vision_tasks(listings, trust_scores_list, session)
|
||||||
|
|
||||||
|
trust_map = {
|
||||||
|
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||||
|
for listing, ts in zip(listings, trust_scores_list)
|
||||||
|
if ts is not None
|
||||||
|
}
|
||||||
|
seller_map = {
|
||||||
|
listing.seller_platform_id: dataclasses.asdict(
|
||||||
|
shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||||
|
)
|
||||||
|
for listing in listings
|
||||||
|
if listing.seller_platform_id
|
||||||
|
and shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
_is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:")
|
||||||
|
_pref_store = None if _is_unauthed else user_store
|
||||||
|
|
||||||
|
def _get_pref_cached(uid: Optional[str], path: str, default=None):
|
||||||
|
return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr]
|
||||||
|
|
||||||
|
def _serialize_listing_cached(l: object) -> dict:
|
||||||
|
d = dataclasses.asdict(l)
|
||||||
|
d["url"] = _wrap_affiliate_url(
|
||||||
|
d["url"],
|
||||||
|
retailer="ebay",
|
||||||
|
user_id=None if _is_unauthed else session.user_id,
|
||||||
|
get_preference=_get_pref_cached if _pref_store is not None else None,
|
||||||
|
)
|
||||||
|
return d
|
||||||
|
|
||||||
|
# Kick off BTF enrichment so live score updates still flow.
|
||||||
|
_trigger_scraper_enrichment(
|
||||||
|
listings, shared_store, shared_db,
|
||||||
|
user_db=user_db, query=comp_query, session_id=session_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"listings": [_serialize_listing_cached(l) for l in listings],
|
||||||
|
"trust_scores": trust_map,
|
||||||
|
"sellers": seller_map,
|
||||||
|
"market_price": cached_market_price,
|
||||||
|
"adapter_used": adapter_used,
|
||||||
|
"affiliate_active": affiliate_active,
|
||||||
|
"session_id": session_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
except _sqlite3.OperationalError as e:
|
||||||
|
log.warning("search (cache hit) DB contention: %s", e)
|
||||||
|
_update_queues.pop(session_id, None)
|
||||||
|
return {
|
||||||
|
"listings": cached_listings_dicts,
|
||||||
|
"trust_scores": {},
|
||||||
|
"sellers": {},
|
||||||
|
"market_price": cached_market_price,
|
||||||
|
"adapter_used": adapter_used,
|
||||||
|
"affiliate_active": affiliate_active,
|
||||||
|
"session_id": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── Cache miss — run full scrape ─────────────────────────────────────────
|
||||||
|
_evict_expired_cache()
|
||||||
|
log.info("cache: miss key=%s q=%r", cache_key, q)
|
||||||
|
|
||||||
# Each thread creates its own Store — sqlite3 check_same_thread=True.
|
# Each thread creates its own Store — sqlite3 check_same_thread=True.
|
||||||
def _run_search(ebay_query: str) -> list:
|
def _run_search(ebay_query: str) -> list:
|
||||||
return _make_adapter(Store(shared_db), adapter).search(ebay_query, base_filters)
|
return _make_adapter(Store(shared_db), adapter).search(ebay_query, base_filters)
|
||||||
|
|
@ -796,6 +987,14 @@ def search(
|
||||||
comp = shared_store.get_market_comp("ebay", query_hash)
|
comp = shared_store.get_market_comp("ebay", query_hash)
|
||||||
market_price = comp.median_price if comp else None
|
market_price = comp.median_price if comp else None
|
||||||
|
|
||||||
|
# Store raw listings (as dicts) + market_price in cache.
|
||||||
|
# Trust scores and seller enrichment are intentionally excluded — they
|
||||||
|
# incorporate per-user signals and must be computed fresh each time.
|
||||||
|
_search_result_cache[cache_key] = (
|
||||||
|
{"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price},
|
||||||
|
_time.time() + _SEARCH_CACHE_TTL,
|
||||||
|
)
|
||||||
|
|
||||||
# Serialize — keyed by platform_listing_id for easy Vue lookup
|
# Serialize — keyed by platform_listing_id for easy Vue lookup
|
||||||
trust_map = {
|
trust_map = {
|
||||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||||
|
|
@ -873,6 +1072,7 @@ def search_async(
|
||||||
must_exclude: str = "",
|
must_exclude: str = "",
|
||||||
category_id: str = "",
|
category_id: str = "",
|
||||||
adapter: str = "auto",
|
adapter: str = "auto",
|
||||||
|
refresh: bool = False, # when True, bypass cache read (still writes fresh result)
|
||||||
session: CloudUser = Depends(get_session),
|
session: CloudUser = Depends(get_session),
|
||||||
):
|
):
|
||||||
"""Async variant of GET /api/search.
|
"""Async variant of GET /api/search.
|
||||||
|
|
@ -923,10 +1123,11 @@ def search_async(
|
||||||
_tier = session.tier
|
_tier = session.tier
|
||||||
_user_id = session.user_id
|
_user_id = session.user_id
|
||||||
_affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
_affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
||||||
|
_refresh = refresh # capture before the closure is dispatched
|
||||||
|
|
||||||
def _background_search() -> None:
|
def _background_search() -> None:
|
||||||
"""Run the full search pipeline and push SSE events to the queue."""
|
"""Run the full search pipeline and push SSE events to the queue."""
|
||||||
import hashlib as _hashlib
|
import hashlib as _hashlib_local
|
||||||
import sqlite3 as _sqlite3
|
import sqlite3 as _sqlite3
|
||||||
|
|
||||||
q_norm = q # captured from outer scope
|
q_norm = q # captured from outer scope
|
||||||
|
|
@ -965,6 +1166,100 @@ def search_async(
|
||||||
if sq is not None:
|
if sq is not None:
|
||||||
sq.put(event)
|
sq.put(event)
|
||||||
|
|
||||||
|
# ── Cache lookup (async background worker) ────────────────────────────
|
||||||
|
async_cache_key = _cache_key(
|
||||||
|
q_norm, max_price, min_price, pages,
|
||||||
|
must_include, must_include_mode, must_exclude, category_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not _refresh:
|
||||||
|
cached = _search_result_cache.get(async_cache_key)
|
||||||
|
if cached is not None:
|
||||||
|
payload, expiry = cached
|
||||||
|
if expiry > _time.time():
|
||||||
|
log.info("cache: hit key=%s q=%r", async_cache_key, q_norm)
|
||||||
|
from app.db.models import Listing as _Listing
|
||||||
|
cached_listings_raw = payload["listings"]
|
||||||
|
cached_market_price = payload["market_price"]
|
||||||
|
try:
|
||||||
|
shared_store = Store(_shared_db)
|
||||||
|
user_store = Store(_user_db)
|
||||||
|
listings = [_Listing(**d) for d in cached_listings_raw]
|
||||||
|
user_store.save_listings(listings)
|
||||||
|
staged = user_store.get_listings_staged(
|
||||||
|
"ebay", [l.platform_listing_id for l in listings]
|
||||||
|
)
|
||||||
|
listings = [staged.get(l.platform_listing_id, l) for l in listings]
|
||||||
|
|
||||||
|
scorer = TrustScorer(shared_store)
|
||||||
|
trust_scores_list = scorer.score_batch(listings, q_norm)
|
||||||
|
user_store.save_trust_scores(trust_scores_list)
|
||||||
|
|
||||||
|
features_obj = compute_features(_tier)
|
||||||
|
if features_obj.photo_analysis:
|
||||||
|
from api.cloud_session import CloudUser as _CloudUser
|
||||||
|
_sess_stub = _CloudUser(
|
||||||
|
user_id=_user_id, tier=_tier,
|
||||||
|
shared_db=_shared_db, user_db=_user_db,
|
||||||
|
)
|
||||||
|
_enqueue_vision_tasks(listings, trust_scores_list, _sess_stub)
|
||||||
|
|
||||||
|
trust_map = {
|
||||||
|
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||||
|
for listing, ts in zip(listings, trust_scores_list)
|
||||||
|
if ts is not None
|
||||||
|
}
|
||||||
|
seller_map = {
|
||||||
|
listing.seller_platform_id: dataclasses.asdict(
|
||||||
|
shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||||
|
)
|
||||||
|
for listing in listings
|
||||||
|
if listing.seller_platform_id
|
||||||
|
and shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
_is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:")
|
||||||
|
_pref_store_hit = None if _is_unauthed else user_store
|
||||||
|
|
||||||
|
def _get_pref_hit(uid: Optional[str], path: str, default=None):
|
||||||
|
return _pref_store_hit.get_user_preference(path, default=default) # type: ignore[union-attr]
|
||||||
|
|
||||||
|
def _serialize_hit(l: object) -> dict:
|
||||||
|
d = dataclasses.asdict(l)
|
||||||
|
d["url"] = _wrap_affiliate_url(
|
||||||
|
d["url"],
|
||||||
|
retailer="ebay",
|
||||||
|
user_id=None if _is_unauthed else _user_id,
|
||||||
|
get_preference=_get_pref_hit if _pref_store_hit is not None else None,
|
||||||
|
)
|
||||||
|
return d
|
||||||
|
|
||||||
|
_push({
|
||||||
|
"type": "listings",
|
||||||
|
"listings": [_serialize_hit(l) for l in listings],
|
||||||
|
"trust_scores": trust_map,
|
||||||
|
"sellers": seller_map,
|
||||||
|
"market_price": cached_market_price,
|
||||||
|
"adapter_used": adapter_used,
|
||||||
|
"affiliate_active": _affiliate_active,
|
||||||
|
"session_id": session_id,
|
||||||
|
})
|
||||||
|
# Enrichment still runs so live score updates flow.
|
||||||
|
_trigger_scraper_enrichment(
|
||||||
|
listings, shared_store, _shared_db,
|
||||||
|
user_db=_user_db, query=comp_query, session_id=session_id,
|
||||||
|
)
|
||||||
|
return # done — no scraping needed
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning(
|
||||||
|
"cache hit path failed, falling through to scrape: %s", exc
|
||||||
|
)
|
||||||
|
# Fall through to full scrape below.
|
||||||
|
|
||||||
|
# ── Cache miss — evict stale entries, then scrape ─────────────────────
|
||||||
|
_evict_expired_cache()
|
||||||
|
log.info("cache: miss key=%s q=%r", async_cache_key, q_norm)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
def _run_search(ebay_query: str) -> list:
|
def _run_search(ebay_query: str) -> list:
|
||||||
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters)
|
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters)
|
||||||
|
|
@ -1038,10 +1333,16 @@ def search_async(
|
||||||
if features_obj.photo_analysis:
|
if features_obj.photo_analysis:
|
||||||
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
|
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
|
||||||
|
|
||||||
query_hash = _hashlib.md5(comp_query.encode()).hexdigest()
|
query_hash = _hashlib_local.md5(comp_query.encode()).hexdigest()
|
||||||
comp = shared_store.get_market_comp("ebay", query_hash)
|
comp = shared_store.get_market_comp("ebay", query_hash)
|
||||||
market_price = comp.median_price if comp else None
|
market_price = comp.median_price if comp else None
|
||||||
|
|
||||||
|
# Store raw listings + market_price in cache (trust scores excluded).
|
||||||
|
_search_result_cache[async_cache_key] = (
|
||||||
|
{"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price},
|
||||||
|
_time.time() + _SEARCH_CACHE_TTL,
|
||||||
|
)
|
||||||
|
|
||||||
trust_map = {
|
trust_map = {
|
||||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||||
for listing, ts in zip(listings, trust_scores_list)
|
for listing, ts in zip(listings, trust_scores_list)
|
||||||
|
|
|
||||||
402
tests/test_search_cache.py
Normal file
402
tests/test_search_cache.py
Normal file
|
|
@ -0,0 +1,402 @@
|
||||||
|
"""Tests for the short-TTL search result cache in api/main.py.
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- _cache_key stability (same inputs → same key)
|
||||||
|
- _cache_key uniqueness (different inputs → different keys)
|
||||||
|
- cache hit path returns early without scraping (async worker)
|
||||||
|
- cache miss path stores result in _search_result_cache
|
||||||
|
- refresh=True bypasses cache read (still writes fresh result)
|
||||||
|
- TTL expiry: expired entries are not returned as hits
|
||||||
|
- _evict_expired_cache removes expired entries
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import queue as _queue
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _clear_cache():
|
||||||
|
"""Reset module-level cache state between tests."""
|
||||||
|
import api.main as _main
|
||||||
|
_main._search_result_cache.clear()
|
||||||
|
_main._last_eviction_ts = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def isolated_cache():
|
||||||
|
"""Ensure each test starts with an empty cache."""
|
||||||
|
_clear_cache()
|
||||||
|
yield
|
||||||
|
_clear_cache()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client(tmp_path):
|
||||||
|
"""TestClient backed by a fresh tmp DB."""
|
||||||
|
os.environ["SNIPE_DB"] = str(tmp_path / "snipe.db")
|
||||||
|
from api.main import app
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
return TestClient(app, raise_server_exceptions=False)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_mock_listing(listing_id: str = "123456789", seller_id: str = "test_seller"):
|
||||||
|
"""Return a MagicMock listing (for use where asdict() is NOT called on it)."""
|
||||||
|
m = MagicMock()
|
||||||
|
m.platform_listing_id = listing_id
|
||||||
|
m.seller_platform_id = seller_id
|
||||||
|
m.title = "Test GPU"
|
||||||
|
m.price = 100.0
|
||||||
|
m.currency = "USD"
|
||||||
|
m.condition = "Used"
|
||||||
|
m.url = f"https://www.ebay.com/itm/{listing_id}"
|
||||||
|
m.photo_urls = []
|
||||||
|
m.listing_age_days = 5
|
||||||
|
m.buying_format = "fixed_price"
|
||||||
|
m.ends_at = None
|
||||||
|
m.fetched_at = None
|
||||||
|
m.trust_score_id = None
|
||||||
|
m.id = 1
|
||||||
|
m.category_name = None
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
def _make_real_listing(listing_id: str = "123456789", seller_id: str = "test_seller"):
|
||||||
|
"""Return a real Listing dataclass instance (for use where asdict() is called)."""
|
||||||
|
from app.db.models import Listing
|
||||||
|
return Listing(
|
||||||
|
platform="ebay",
|
||||||
|
platform_listing_id=listing_id,
|
||||||
|
title="Test GPU",
|
||||||
|
price=100.0,
|
||||||
|
currency="USD",
|
||||||
|
condition="Used",
|
||||||
|
seller_platform_id=seller_id,
|
||||||
|
url=f"https://www.ebay.com/itm/{listing_id}",
|
||||||
|
photo_urls=[],
|
||||||
|
listing_age_days=5,
|
||||||
|
buying_format="fixed_price",
|
||||||
|
id=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── _cache_key unit tests ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_cache_key_stable_for_same_inputs():
|
||||||
|
"""The same parameter set always produces the same key."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386")
|
||||||
|
k2 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386")
|
||||||
|
assert k1 == k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_case_normalised():
|
||||||
|
"""Query is normalised to lower-case + stripped before hashing."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("RTX 3080", None, None, 1, "", "all", "", "")
|
||||||
|
k2 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||||
|
assert k1 == k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_differs_on_query_change():
|
||||||
|
"""Different query strings must produce different keys."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||||
|
k2 = _cache_key("gtx 1080", None, None, 1, "", "all", "", "")
|
||||||
|
assert k1 != k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_differs_on_price_filter():
|
||||||
|
"""Different max_price must produce a different key."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("gpu", 400.0, None, 1, "", "all", "", "")
|
||||||
|
k2 = _cache_key("gpu", 500.0, None, 1, "", "all", "", "")
|
||||||
|
assert k1 != k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_differs_on_min_price():
|
||||||
|
"""Different min_price must produce a different key."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("gpu", None, 50.0, 1, "", "all", "", "")
|
||||||
|
k2 = _cache_key("gpu", None, 100.0, 1, "", "all", "", "")
|
||||||
|
assert k1 != k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_differs_on_pages():
|
||||||
|
"""Different page count must produce a different key."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
||||||
|
k2 = _cache_key("gpu", None, None, 2, "", "all", "", "")
|
||||||
|
assert k1 != k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_differs_on_must_include():
|
||||||
|
"""Different must_include terms must produce a different key."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("gpu", None, None, 1, "rtx", "all", "", "")
|
||||||
|
k2 = _cache_key("gpu", None, None, 1, "gtx", "all", "", "")
|
||||||
|
assert k1 != k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_differs_on_must_exclude():
|
||||||
|
"""Different must_exclude terms must produce a different key."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("gpu", None, None, 1, "", "all", "mining", "")
|
||||||
|
k2 = _cache_key("gpu", None, None, 1, "", "all", "defective", "")
|
||||||
|
assert k1 != k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_differs_on_category_id():
|
||||||
|
"""Different category_id must produce a different key."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k1 = _cache_key("gpu", None, None, 1, "", "all", "", "27386")
|
||||||
|
k2 = _cache_key("gpu", None, None, 1, "", "all", "", "12345")
|
||||||
|
assert k1 != k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_is_16_chars():
|
||||||
|
"""Key must be exactly 16 hex characters."""
|
||||||
|
from api.main import _cache_key
|
||||||
|
k = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
||||||
|
assert len(k) == 16
|
||||||
|
assert all(c in "0123456789abcdef" for c in k)
|
||||||
|
|
||||||
|
|
||||||
|
# ── TTL / eviction unit tests ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_expired_entry_is_not_returned_as_hit():
|
||||||
|
"""An entry past its TTL must not be treated as a cache hit."""
|
||||||
|
import api.main as _main
|
||||||
|
from api.main import _cache_key
|
||||||
|
|
||||||
|
key = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
||||||
|
# Write an already-expired entry.
|
||||||
|
_main._search_result_cache[key] = (
|
||||||
|
{"listings": [], "market_price": None},
|
||||||
|
time.time() - 1.0, # expired 1 second ago
|
||||||
|
)
|
||||||
|
|
||||||
|
cached = _main._search_result_cache.get(key)
|
||||||
|
assert cached is not None
|
||||||
|
payload, expiry = cached
|
||||||
|
# Simulate the hit-check used in main.py
|
||||||
|
assert expiry <= time.time(), "Entry should be expired"
|
||||||
|
|
||||||
|
|
||||||
|
def test_evict_expired_cache_removes_stale_entries():
|
||||||
|
"""_evict_expired_cache must remove entries whose expiry has passed."""
|
||||||
|
import api.main as _main
|
||||||
|
from api.main import _cache_key, _evict_expired_cache
|
||||||
|
|
||||||
|
key_expired = _cache_key("old query", None, None, 1, "", "all", "", "")
|
||||||
|
key_valid = _cache_key("new query", None, None, 1, "", "all", "", "")
|
||||||
|
|
||||||
|
_main._search_result_cache[key_expired] = (
|
||||||
|
{"listings": [], "market_price": None},
|
||||||
|
time.time() - 10.0, # already expired
|
||||||
|
)
|
||||||
|
_main._search_result_cache[key_valid] = (
|
||||||
|
{"listings": [], "market_price": 99.0},
|
||||||
|
time.time() + 300.0, # valid for 5 min
|
||||||
|
)
|
||||||
|
|
||||||
|
# Reset throttle so eviction runs immediately.
|
||||||
|
_main._last_eviction_ts = 0.0
|
||||||
|
_evict_expired_cache()
|
||||||
|
|
||||||
|
assert key_expired not in _main._search_result_cache
|
||||||
|
assert key_valid in _main._search_result_cache
|
||||||
|
|
||||||
|
|
||||||
|
def test_evict_is_rate_limited():
|
||||||
|
"""_evict_expired_cache should skip eviction if called within 60 s."""
|
||||||
|
import api.main as _main
|
||||||
|
from api.main import _cache_key, _evict_expired_cache
|
||||||
|
|
||||||
|
key_expired = _cache_key("stale", None, None, 1, "", "all", "", "")
|
||||||
|
_main._search_result_cache[key_expired] = (
|
||||||
|
{"listings": [], "market_price": None},
|
||||||
|
time.time() - 5.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Pretend eviction just ran.
|
||||||
|
_main._last_eviction_ts = time.time()
|
||||||
|
_evict_expired_cache()
|
||||||
|
|
||||||
|
# Entry should still be present because eviction was throttled.
|
||||||
|
assert key_expired in _main._search_result_cache
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration tests — async endpoint cache hit ──────────────────────────────
|
||||||
|
|
||||||
|
def test_async_cache_hit_skips_scraper(client, tmp_path):
|
||||||
|
"""On a warm cache hit the scraper adapter must not be called."""
|
||||||
|
import threading
|
||||||
|
import api.main as _main
|
||||||
|
from api.main import _cache_key
|
||||||
|
|
||||||
|
# Pre-seed a valid cache entry.
|
||||||
|
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||||
|
_main._search_result_cache[key] = (
|
||||||
|
{"listings": [], "market_price": 250.0},
|
||||||
|
time.time() + 300.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
scraper_called = threading.Event()
|
||||||
|
|
||||||
|
def _fake_search(query, filters):
|
||||||
|
scraper_called.set()
|
||||||
|
return []
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||||
|
patch("api.main._trigger_scraper_enrichment"),
|
||||||
|
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||||
|
patch("api.main.Store") as mock_store_cls,
|
||||||
|
):
|
||||||
|
mock_adapter = MagicMock()
|
||||||
|
mock_adapter.search.side_effect = _fake_search
|
||||||
|
mock_adapter.get_completed_sales.return_value = None
|
||||||
|
mock_adapter_factory.return_value = mock_adapter
|
||||||
|
|
||||||
|
mock_scorer = MagicMock()
|
||||||
|
mock_scorer.score_batch.return_value = []
|
||||||
|
mock_scorer_cls.return_value = mock_scorer
|
||||||
|
|
||||||
|
mock_store = MagicMock()
|
||||||
|
mock_store.get_listings_staged.return_value = {}
|
||||||
|
mock_store.refresh_seller_categories.return_value = 0
|
||||||
|
mock_store.save_listings.return_value = None
|
||||||
|
mock_store.save_trust_scores.return_value = None
|
||||||
|
mock_store.get_market_comp.return_value = None
|
||||||
|
mock_store.get_seller.return_value = None
|
||||||
|
mock_store.get_user_preference.return_value = None
|
||||||
|
mock_store_cls.return_value = mock_store
|
||||||
|
|
||||||
|
resp = client.get("/api/search/async?q=rtx+3080")
|
||||||
|
assert resp.status_code == 202
|
||||||
|
|
||||||
|
# Give the background worker a moment to run.
|
||||||
|
scraper_called.wait(timeout=3.0)
|
||||||
|
|
||||||
|
# Scraper must NOT have been called on a cache hit.
|
||||||
|
assert not scraper_called.is_set(), "Scraper was called despite a warm cache hit"
|
||||||
|
|
||||||
|
|
||||||
|
def test_async_cache_miss_stores_result(client, tmp_path):
|
||||||
|
"""After a cache miss the result must be stored in _search_result_cache."""
|
||||||
|
import threading
|
||||||
|
import api.main as _main
|
||||||
|
from api.main import _cache_key
|
||||||
|
|
||||||
|
search_done = threading.Event()
|
||||||
|
real_listing = _make_real_listing()
|
||||||
|
|
||||||
|
def _fake_search(query, filters):
|
||||||
|
return [real_listing]
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||||
|
patch("api.main._trigger_scraper_enrichment") as mock_enrich,
|
||||||
|
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||||
|
patch("api.main.Store") as mock_store_cls,
|
||||||
|
):
|
||||||
|
mock_adapter = MagicMock()
|
||||||
|
mock_adapter.search.side_effect = _fake_search
|
||||||
|
mock_adapter.get_completed_sales.return_value = None
|
||||||
|
mock_adapter_factory.return_value = mock_adapter
|
||||||
|
|
||||||
|
mock_scorer = MagicMock()
|
||||||
|
mock_scorer.score_batch.return_value = []
|
||||||
|
mock_scorer_cls.return_value = mock_scorer
|
||||||
|
|
||||||
|
mock_store = MagicMock()
|
||||||
|
mock_store.get_listings_staged.return_value = {
|
||||||
|
real_listing.platform_listing_id: real_listing
|
||||||
|
}
|
||||||
|
mock_store.refresh_seller_categories.return_value = 0
|
||||||
|
mock_store.save_listings.return_value = None
|
||||||
|
mock_store.save_trust_scores.return_value = None
|
||||||
|
mock_store.get_market_comp.return_value = None
|
||||||
|
mock_store.get_seller.return_value = None
|
||||||
|
mock_store.get_user_preference.return_value = None
|
||||||
|
mock_store_cls.return_value = mock_store
|
||||||
|
|
||||||
|
def _enrich_side_effect(*args, **kwargs):
|
||||||
|
search_done.set()
|
||||||
|
|
||||||
|
mock_enrich.side_effect = _enrich_side_effect
|
||||||
|
|
||||||
|
resp = client.get("/api/search/async?q=rtx+3080")
|
||||||
|
assert resp.status_code == 202
|
||||||
|
|
||||||
|
# Wait until the background worker reaches _trigger_scraper_enrichment.
|
||||||
|
search_done.wait(timeout=5.0)
|
||||||
|
|
||||||
|
assert search_done.is_set(), "Background search worker never completed"
|
||||||
|
|
||||||
|
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||||
|
assert key in _main._search_result_cache, "Result was not stored in cache after miss"
|
||||||
|
payload, expiry = _main._search_result_cache[key]
|
||||||
|
assert expiry > time.time(), "Cache entry has already expired"
|
||||||
|
assert "listings" in payload
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration tests — async endpoint refresh=True ──────────────────────────
|
||||||
|
|
||||||
|
def test_async_refresh_bypasses_cache_read(client, tmp_path):
|
||||||
|
"""refresh=True must bypass cache read and invoke the scraper."""
|
||||||
|
import threading
|
||||||
|
import api.main as _main
|
||||||
|
from api.main import _cache_key
|
||||||
|
|
||||||
|
# Seed a valid cache entry so we can confirm it is bypassed.
|
||||||
|
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
||||||
|
_main._search_result_cache[key] = (
|
||||||
|
{"listings": [], "market_price": 100.0},
|
||||||
|
time.time() + 300.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
scraper_called = threading.Event()
|
||||||
|
|
||||||
|
def _fake_search(query, filters):
|
||||||
|
scraper_called.set()
|
||||||
|
return []
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||||
|
patch("api.main._trigger_scraper_enrichment"),
|
||||||
|
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||||
|
patch("api.main.Store") as mock_store_cls,
|
||||||
|
):
|
||||||
|
mock_adapter = MagicMock()
|
||||||
|
mock_adapter.search.side_effect = _fake_search
|
||||||
|
mock_adapter.get_completed_sales.return_value = None
|
||||||
|
mock_adapter_factory.return_value = mock_adapter
|
||||||
|
|
||||||
|
mock_scorer = MagicMock()
|
||||||
|
mock_scorer.score_batch.return_value = []
|
||||||
|
mock_scorer_cls.return_value = mock_scorer
|
||||||
|
|
||||||
|
mock_store = MagicMock()
|
||||||
|
mock_store.get_listings_staged.return_value = {}
|
||||||
|
mock_store.refresh_seller_categories.return_value = 0
|
||||||
|
mock_store.save_listings.return_value = None
|
||||||
|
mock_store.save_trust_scores.return_value = None
|
||||||
|
mock_store.get_market_comp.return_value = None
|
||||||
|
mock_store.get_seller.return_value = None
|
||||||
|
mock_store.get_user_preference.return_value = None
|
||||||
|
mock_store_cls.return_value = mock_store
|
||||||
|
|
||||||
|
resp = client.get("/api/search/async?q=rtx+3080&refresh=true")
|
||||||
|
assert resp.status_code == 202
|
||||||
|
|
||||||
|
scraper_called.wait(timeout=5.0)
|
||||||
|
|
||||||
|
assert scraper_called.is_set(), "Scraper was not called even though refresh=True"
|
||||||
Loading…
Reference in a new issue