Compare commits
No commits in common. "29d2033ef23caca82fbca9e043c79451e49ae157" and "844721c6fdc253594bfcac49c595a58b78bfa80d" have entirely different histories.
29d2033ef2
...
844721c6fd
2 changed files with 2 additions and 705 deletions
305
api/main.py
305
api/main.py
|
|
@ -5,14 +5,12 @@ import asyncio
|
||||||
import csv
|
import csv
|
||||||
import dataclasses
|
import dataclasses
|
||||||
import hashlib
|
import hashlib
|
||||||
import hashlib as _hashlib
|
|
||||||
import io
|
import io
|
||||||
import json as _json
|
import json as _json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import queue as _queue
|
import queue as _queue
|
||||||
import re
|
import re
|
||||||
import time as _time
|
|
||||||
import uuid
|
import uuid
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
@ -77,61 +75,6 @@ def _auth_label(user_id: str) -> str:
|
||||||
_update_queues: dict[str, _queue.SimpleQueue] = {}
|
_update_queues: dict[str, _queue.SimpleQueue] = {}
|
||||||
|
|
||||||
|
|
||||||
# ── Short-TTL search result cache ────────────────────────────────────────────
|
|
||||||
# Caches raw eBay listings and market_price only — trust scores are NOT cached
|
|
||||||
# because they incorporate per-user signals (zero_feedback cap, etc.).
|
|
||||||
# On cache hit the trust scorer and seller lookups run against the local DB as
|
|
||||||
# normal; only the expensive Playwright/Browse API scrape is skipped.
|
|
||||||
#
|
|
||||||
# TTL is configurable via SEARCH_CACHE_TTL_S (default 300 s = 5 min).
|
|
||||||
# Listings are public eBay data — safe to share across all users.
|
|
||||||
|
|
||||||
_SEARCH_CACHE_TTL = int(os.environ.get("SEARCH_CACHE_TTL_S", "300"))
|
|
||||||
|
|
||||||
# key → ({"listings": [...raw dicts...], "market_price": float|None}, expiry_ts)
|
|
||||||
_search_result_cache: dict[str, tuple[dict, float]] = {}
|
|
||||||
|
|
||||||
# Throttle eviction sweeps to at most once per 60 s.
|
|
||||||
_last_eviction_ts: float = 0.0
|
|
||||||
|
|
||||||
|
|
||||||
def _cache_key(
|
|
||||||
q: str,
|
|
||||||
max_price: "float | None",
|
|
||||||
min_price: "float | None",
|
|
||||||
pages: int,
|
|
||||||
must_include: str,
|
|
||||||
must_include_mode: str,
|
|
||||||
must_exclude: str,
|
|
||||||
category_id: str,
|
|
||||||
) -> str:
|
|
||||||
"""Stable 16-char hex key for a search param set. Query is lower-cased + stripped."""
|
|
||||||
raw = (
|
|
||||||
f"{q.lower().strip()}|{max_price}|{min_price}|{pages}"
|
|
||||||
f"|{must_include.lower().strip()}|{must_include_mode}"
|
|
||||||
f"|{must_exclude.lower().strip()}|{category_id.strip()}"
|
|
||||||
)
|
|
||||||
return _hashlib.sha256(raw.encode()).hexdigest()[:16]
|
|
||||||
|
|
||||||
|
|
||||||
def _evict_expired_cache() -> None:
|
|
||||||
"""Remove stale entries from _search_result_cache.
|
|
||||||
|
|
||||||
Called opportunistically on each cache miss; rate-limited to once per 60 s
|
|
||||||
to avoid quadratic overhead when many concurrent misses arrive at once.
|
|
||||||
"""
|
|
||||||
global _last_eviction_ts
|
|
||||||
now = _time.time()
|
|
||||||
if now - _last_eviction_ts < 60.0:
|
|
||||||
return
|
|
||||||
_last_eviction_ts = now
|
|
||||||
expired = [k for k, (_, exp) in _search_result_cache.items() if exp <= now]
|
|
||||||
for k in expired:
|
|
||||||
_search_result_cache.pop(k, None)
|
|
||||||
if expired:
|
|
||||||
log.debug("cache: evicted %d expired entries", len(expired))
|
|
||||||
|
|
||||||
|
|
||||||
# ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ────────
|
# ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ────────
|
||||||
# Holds SnipeCommunityStore at module level so endpoints can publish signals
|
# Holds SnipeCommunityStore at module level so endpoints can publish signals
|
||||||
# without constructing a new connection pool on every request.
|
# without constructing a new connection pool on every request.
|
||||||
|
|
@ -154,21 +97,6 @@ def _get_query_translator():
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _lifespan(app: FastAPI):
|
async def _lifespan(app: FastAPI):
|
||||||
global _community_store
|
global _community_store
|
||||||
# Pre-warm the Chromium browser pool so the first scrape request does not
|
|
||||||
# pay the full cold-start cost (5-10s Xvfb + browser launch).
|
|
||||||
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
|
|
||||||
import threading as _threading
|
|
||||||
from app.platforms.ebay.browser_pool import get_pool as _get_browser_pool
|
|
||||||
_browser_pool = _get_browser_pool()
|
|
||||||
_pool_thread = _threading.Thread(
|
|
||||||
target=_browser_pool.start, daemon=True, name="browser-pool-start"
|
|
||||||
)
|
|
||||||
_pool_thread.start()
|
|
||||||
log.info(
|
|
||||||
"BrowserPool: pre-warm started in background (BROWSER_POOL_SIZE=%s)",
|
|
||||||
os.environ.get("BROWSER_POOL_SIZE", "2"),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Start vision/LLM background task scheduler.
|
# Start vision/LLM background task scheduler.
|
||||||
# background_tasks queue lives in shared_db (cloud) or local_db (local)
|
# background_tasks queue lives in shared_db (cloud) or local_db (local)
|
||||||
# so the scheduler has a single stable DB path across all cloud users.
|
# so the scheduler has a single stable DB path across all cloud users.
|
||||||
|
|
@ -276,13 +204,6 @@ async def _lifespan(app: FastAPI):
|
||||||
get_scheduler(sched_db).shutdown(timeout=10.0)
|
get_scheduler(sched_db).shutdown(timeout=10.0)
|
||||||
reset_scheduler()
|
reset_scheduler()
|
||||||
log.info("Snipe task scheduler stopped.")
|
log.info("Snipe task scheduler stopped.")
|
||||||
|
|
||||||
# Drain and close all pre-warmed browser pool slots.
|
|
||||||
try:
|
|
||||||
_browser_pool.stop()
|
|
||||||
except Exception:
|
|
||||||
log.warning("BrowserPool: error during shutdown", exc_info=True)
|
|
||||||
|
|
||||||
if _community_store is not None:
|
if _community_store is not None:
|
||||||
try:
|
try:
|
||||||
_community_store._db.close()
|
_community_store._db.close()
|
||||||
|
|
@ -712,7 +633,6 @@ def search(
|
||||||
must_exclude: str = "", # comma-separated; forwarded to eBay -term + client-side
|
must_exclude: str = "", # comma-separated; forwarded to eBay -term + client-side
|
||||||
category_id: str = "", # eBay category ID — forwarded to Browse API / scraper _sacat
|
category_id: str = "", # eBay category ID — forwarded to Browse API / scraper _sacat
|
||||||
adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection
|
adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection
|
||||||
refresh: bool = False, # when True, bypass cache read (still writes fresh result)
|
|
||||||
session: CloudUser = Depends(get_session),
|
session: CloudUser = Depends(get_session),
|
||||||
):
|
):
|
||||||
# If the user pasted an eBay listing or checkout URL, extract the item ID
|
# If the user pasted an eBay listing or checkout URL, extract the item ID
|
||||||
|
|
@ -765,117 +685,6 @@ def search(
|
||||||
shared_db = session.shared_db
|
shared_db = session.shared_db
|
||||||
user_db = session.user_db
|
user_db = session.user_db
|
||||||
|
|
||||||
# ── Cache lookup (synchronous endpoint) ──────────────────────────────────
|
|
||||||
cache_key = _cache_key(q, max_price, min_price, pages, must_include, must_include_mode, must_exclude, category_id)
|
|
||||||
|
|
||||||
cached_listings_dicts: "list | None" = None
|
|
||||||
cached_market_price: "float | None" = None
|
|
||||||
|
|
||||||
if not refresh:
|
|
||||||
cached = _search_result_cache.get(cache_key)
|
|
||||||
if cached is not None:
|
|
||||||
payload, expiry = cached
|
|
||||||
if expiry > _time.time():
|
|
||||||
log.info("cache: hit key=%s q=%r", cache_key, q)
|
|
||||||
cached_listings_dicts = payload["listings"]
|
|
||||||
cached_market_price = payload["market_price"]
|
|
||||||
|
|
||||||
if cached_listings_dicts is not None:
|
|
||||||
# Cache hit path: reconstruct listings as plain dicts (already serialised),
|
|
||||||
# re-run trust scorer against the local DB so per-user signals are fresh,
|
|
||||||
# and kick off background enrichment as normal.
|
|
||||||
import sqlite3 as _sqlite3
|
|
||||||
|
|
||||||
affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
|
||||||
session_id = str(uuid.uuid4())
|
|
||||||
_update_queues[session_id] = _queue.SimpleQueue()
|
|
||||||
|
|
||||||
try:
|
|
||||||
shared_store = Store(shared_db)
|
|
||||||
user_store = Store(user_db)
|
|
||||||
|
|
||||||
# Re-hydrate Listing dataclass instances from the cached dicts so the
|
|
||||||
# scorer and DB calls receive proper typed objects.
|
|
||||||
from app.db.models import Listing as _Listing
|
|
||||||
listings = [_Listing(**d) for d in cached_listings_dicts]
|
|
||||||
|
|
||||||
# Re-save to user_store so staging fields are current for this session.
|
|
||||||
user_store.save_listings(listings)
|
|
||||||
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
|
|
||||||
listings = [staged.get(l.platform_listing_id, l) for l in listings]
|
|
||||||
|
|
||||||
# Fresh trust scores against local DB (not cached — user-specific).
|
|
||||||
scorer = TrustScorer(shared_store)
|
|
||||||
trust_scores_list = scorer.score_batch(listings, q)
|
|
||||||
user_store.save_trust_scores(trust_scores_list)
|
|
||||||
|
|
||||||
features = compute_features(session.tier)
|
|
||||||
if features.photo_analysis:
|
|
||||||
_enqueue_vision_tasks(listings, trust_scores_list, session)
|
|
||||||
|
|
||||||
trust_map = {
|
|
||||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
|
||||||
for listing, ts in zip(listings, trust_scores_list)
|
|
||||||
if ts is not None
|
|
||||||
}
|
|
||||||
seller_map = {
|
|
||||||
listing.seller_platform_id: dataclasses.asdict(
|
|
||||||
shared_store.get_seller("ebay", listing.seller_platform_id)
|
|
||||||
)
|
|
||||||
for listing in listings
|
|
||||||
if listing.seller_platform_id
|
|
||||||
and shared_store.get_seller("ebay", listing.seller_platform_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
_is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:")
|
|
||||||
_pref_store = None if _is_unauthed else user_store
|
|
||||||
|
|
||||||
def _get_pref_cached(uid: Optional[str], path: str, default=None):
|
|
||||||
return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr]
|
|
||||||
|
|
||||||
def _serialize_listing_cached(l: object) -> dict:
|
|
||||||
d = dataclasses.asdict(l)
|
|
||||||
d["url"] = _wrap_affiliate_url(
|
|
||||||
d["url"],
|
|
||||||
retailer="ebay",
|
|
||||||
user_id=None if _is_unauthed else session.user_id,
|
|
||||||
get_preference=_get_pref_cached if _pref_store is not None else None,
|
|
||||||
)
|
|
||||||
return d
|
|
||||||
|
|
||||||
# Kick off BTF enrichment so live score updates still flow.
|
|
||||||
_trigger_scraper_enrichment(
|
|
||||||
listings, shared_store, shared_db,
|
|
||||||
user_db=user_db, query=comp_query, session_id=session_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"listings": [_serialize_listing_cached(l) for l in listings],
|
|
||||||
"trust_scores": trust_map,
|
|
||||||
"sellers": seller_map,
|
|
||||||
"market_price": cached_market_price,
|
|
||||||
"adapter_used": adapter_used,
|
|
||||||
"affiliate_active": affiliate_active,
|
|
||||||
"session_id": session_id,
|
|
||||||
}
|
|
||||||
|
|
||||||
except _sqlite3.OperationalError as e:
|
|
||||||
log.warning("search (cache hit) DB contention: %s", e)
|
|
||||||
_update_queues.pop(session_id, None)
|
|
||||||
return {
|
|
||||||
"listings": cached_listings_dicts,
|
|
||||||
"trust_scores": {},
|
|
||||||
"sellers": {},
|
|
||||||
"market_price": cached_market_price,
|
|
||||||
"adapter_used": adapter_used,
|
|
||||||
"affiliate_active": affiliate_active,
|
|
||||||
"session_id": None,
|
|
||||||
}
|
|
||||||
|
|
||||||
# ── Cache miss — run full scrape ─────────────────────────────────────────
|
|
||||||
_evict_expired_cache()
|
|
||||||
log.info("cache: miss key=%s q=%r", cache_key, q)
|
|
||||||
|
|
||||||
# Each thread creates its own Store — sqlite3 check_same_thread=True.
|
# Each thread creates its own Store — sqlite3 check_same_thread=True.
|
||||||
def _run_search(ebay_query: str) -> list:
|
def _run_search(ebay_query: str) -> list:
|
||||||
return _make_adapter(Store(shared_db), adapter).search(ebay_query, base_filters)
|
return _make_adapter(Store(shared_db), adapter).search(ebay_query, base_filters)
|
||||||
|
|
@ -987,14 +796,6 @@ def search(
|
||||||
comp = shared_store.get_market_comp("ebay", query_hash)
|
comp = shared_store.get_market_comp("ebay", query_hash)
|
||||||
market_price = comp.median_price if comp else None
|
market_price = comp.median_price if comp else None
|
||||||
|
|
||||||
# Store raw listings (as dicts) + market_price in cache.
|
|
||||||
# Trust scores and seller enrichment are intentionally excluded — they
|
|
||||||
# incorporate per-user signals and must be computed fresh each time.
|
|
||||||
_search_result_cache[cache_key] = (
|
|
||||||
{"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price},
|
|
||||||
_time.time() + _SEARCH_CACHE_TTL,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Serialize — keyed by platform_listing_id for easy Vue lookup
|
# Serialize — keyed by platform_listing_id for easy Vue lookup
|
||||||
trust_map = {
|
trust_map = {
|
||||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||||
|
|
@ -1072,7 +873,6 @@ def search_async(
|
||||||
must_exclude: str = "",
|
must_exclude: str = "",
|
||||||
category_id: str = "",
|
category_id: str = "",
|
||||||
adapter: str = "auto",
|
adapter: str = "auto",
|
||||||
refresh: bool = False, # when True, bypass cache read (still writes fresh result)
|
|
||||||
session: CloudUser = Depends(get_session),
|
session: CloudUser = Depends(get_session),
|
||||||
):
|
):
|
||||||
"""Async variant of GET /api/search.
|
"""Async variant of GET /api/search.
|
||||||
|
|
@ -1123,11 +923,10 @@ def search_async(
|
||||||
_tier = session.tier
|
_tier = session.tier
|
||||||
_user_id = session.user_id
|
_user_id = session.user_id
|
||||||
_affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
_affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
||||||
_refresh = refresh # capture before the closure is dispatched
|
|
||||||
|
|
||||||
def _background_search() -> None:
|
def _background_search() -> None:
|
||||||
"""Run the full search pipeline and push SSE events to the queue."""
|
"""Run the full search pipeline and push SSE events to the queue."""
|
||||||
import hashlib as _hashlib_local
|
import hashlib as _hashlib
|
||||||
import sqlite3 as _sqlite3
|
import sqlite3 as _sqlite3
|
||||||
|
|
||||||
q_norm = q # captured from outer scope
|
q_norm = q # captured from outer scope
|
||||||
|
|
@ -1166,100 +965,6 @@ def search_async(
|
||||||
if sq is not None:
|
if sq is not None:
|
||||||
sq.put(event)
|
sq.put(event)
|
||||||
|
|
||||||
# ── Cache lookup (async background worker) ────────────────────────────
|
|
||||||
async_cache_key = _cache_key(
|
|
||||||
q_norm, max_price, min_price, pages,
|
|
||||||
must_include, must_include_mode, must_exclude, category_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not _refresh:
|
|
||||||
cached = _search_result_cache.get(async_cache_key)
|
|
||||||
if cached is not None:
|
|
||||||
payload, expiry = cached
|
|
||||||
if expiry > _time.time():
|
|
||||||
log.info("cache: hit key=%s q=%r", async_cache_key, q_norm)
|
|
||||||
from app.db.models import Listing as _Listing
|
|
||||||
cached_listings_raw = payload["listings"]
|
|
||||||
cached_market_price = payload["market_price"]
|
|
||||||
try:
|
|
||||||
shared_store = Store(_shared_db)
|
|
||||||
user_store = Store(_user_db)
|
|
||||||
listings = [_Listing(**d) for d in cached_listings_raw]
|
|
||||||
user_store.save_listings(listings)
|
|
||||||
staged = user_store.get_listings_staged(
|
|
||||||
"ebay", [l.platform_listing_id for l in listings]
|
|
||||||
)
|
|
||||||
listings = [staged.get(l.platform_listing_id, l) for l in listings]
|
|
||||||
|
|
||||||
scorer = TrustScorer(shared_store)
|
|
||||||
trust_scores_list = scorer.score_batch(listings, q_norm)
|
|
||||||
user_store.save_trust_scores(trust_scores_list)
|
|
||||||
|
|
||||||
features_obj = compute_features(_tier)
|
|
||||||
if features_obj.photo_analysis:
|
|
||||||
from api.cloud_session import CloudUser as _CloudUser
|
|
||||||
_sess_stub = _CloudUser(
|
|
||||||
user_id=_user_id, tier=_tier,
|
|
||||||
shared_db=_shared_db, user_db=_user_db,
|
|
||||||
)
|
|
||||||
_enqueue_vision_tasks(listings, trust_scores_list, _sess_stub)
|
|
||||||
|
|
||||||
trust_map = {
|
|
||||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
|
||||||
for listing, ts in zip(listings, trust_scores_list)
|
|
||||||
if ts is not None
|
|
||||||
}
|
|
||||||
seller_map = {
|
|
||||||
listing.seller_platform_id: dataclasses.asdict(
|
|
||||||
shared_store.get_seller("ebay", listing.seller_platform_id)
|
|
||||||
)
|
|
||||||
for listing in listings
|
|
||||||
if listing.seller_platform_id
|
|
||||||
and shared_store.get_seller("ebay", listing.seller_platform_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
_is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:")
|
|
||||||
_pref_store_hit = None if _is_unauthed else user_store
|
|
||||||
|
|
||||||
def _get_pref_hit(uid: Optional[str], path: str, default=None):
|
|
||||||
return _pref_store_hit.get_user_preference(path, default=default) # type: ignore[union-attr]
|
|
||||||
|
|
||||||
def _serialize_hit(l: object) -> dict:
|
|
||||||
d = dataclasses.asdict(l)
|
|
||||||
d["url"] = _wrap_affiliate_url(
|
|
||||||
d["url"],
|
|
||||||
retailer="ebay",
|
|
||||||
user_id=None if _is_unauthed else _user_id,
|
|
||||||
get_preference=_get_pref_hit if _pref_store_hit is not None else None,
|
|
||||||
)
|
|
||||||
return d
|
|
||||||
|
|
||||||
_push({
|
|
||||||
"type": "listings",
|
|
||||||
"listings": [_serialize_hit(l) for l in listings],
|
|
||||||
"trust_scores": trust_map,
|
|
||||||
"sellers": seller_map,
|
|
||||||
"market_price": cached_market_price,
|
|
||||||
"adapter_used": adapter_used,
|
|
||||||
"affiliate_active": _affiliate_active,
|
|
||||||
"session_id": session_id,
|
|
||||||
})
|
|
||||||
# Enrichment still runs so live score updates flow.
|
|
||||||
_trigger_scraper_enrichment(
|
|
||||||
listings, shared_store, _shared_db,
|
|
||||||
user_db=_user_db, query=comp_query, session_id=session_id,
|
|
||||||
)
|
|
||||||
return # done — no scraping needed
|
|
||||||
except Exception as exc:
|
|
||||||
log.warning(
|
|
||||||
"cache hit path failed, falling through to scrape: %s", exc
|
|
||||||
)
|
|
||||||
# Fall through to full scrape below.
|
|
||||||
|
|
||||||
# ── Cache miss — evict stale entries, then scrape ─────────────────────
|
|
||||||
_evict_expired_cache()
|
|
||||||
log.info("cache: miss key=%s q=%r", async_cache_key, q_norm)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
def _run_search(ebay_query: str) -> list:
|
def _run_search(ebay_query: str) -> list:
|
||||||
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters)
|
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters)
|
||||||
|
|
@ -1333,16 +1038,10 @@ def search_async(
|
||||||
if features_obj.photo_analysis:
|
if features_obj.photo_analysis:
|
||||||
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
|
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
|
||||||
|
|
||||||
query_hash = _hashlib_local.md5(comp_query.encode()).hexdigest()
|
query_hash = _hashlib.md5(comp_query.encode()).hexdigest()
|
||||||
comp = shared_store.get_market_comp("ebay", query_hash)
|
comp = shared_store.get_market_comp("ebay", query_hash)
|
||||||
market_price = comp.median_price if comp else None
|
market_price = comp.median_price if comp else None
|
||||||
|
|
||||||
# Store raw listings + market_price in cache (trust scores excluded).
|
|
||||||
_search_result_cache[async_cache_key] = (
|
|
||||||
{"listings": [dataclasses.asdict(l) for l in listings], "market_price": market_price},
|
|
||||||
_time.time() + _SEARCH_CACHE_TTL,
|
|
||||||
)
|
|
||||||
|
|
||||||
trust_map = {
|
trust_map = {
|
||||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||||
for listing, ts in zip(listings, trust_scores_list)
|
for listing, ts in zip(listings, trust_scores_list)
|
||||||
|
|
|
||||||
|
|
@ -1,402 +0,0 @@
|
||||||
"""Tests for the short-TTL search result cache in api/main.py.
|
|
||||||
|
|
||||||
Covers:
|
|
||||||
- _cache_key stability (same inputs → same key)
|
|
||||||
- _cache_key uniqueness (different inputs → different keys)
|
|
||||||
- cache hit path returns early without scraping (async worker)
|
|
||||||
- cache miss path stores result in _search_result_cache
|
|
||||||
- refresh=True bypasses cache read (still writes fresh result)
|
|
||||||
- TTL expiry: expired entries are not returned as hits
|
|
||||||
- _evict_expired_cache removes expired entries
|
|
||||||
"""
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import os
|
|
||||||
import queue as _queue
|
|
||||||
import time
|
|
||||||
from pathlib import Path
|
|
||||||
from unittest.mock import MagicMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _clear_cache():
|
|
||||||
"""Reset module-level cache state between tests."""
|
|
||||||
import api.main as _main
|
|
||||||
_main._search_result_cache.clear()
|
|
||||||
_main._last_eviction_ts = 0.0
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def isolated_cache():
|
|
||||||
"""Ensure each test starts with an empty cache."""
|
|
||||||
_clear_cache()
|
|
||||||
yield
|
|
||||||
_clear_cache()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def client(tmp_path):
|
|
||||||
"""TestClient backed by a fresh tmp DB."""
|
|
||||||
os.environ["SNIPE_DB"] = str(tmp_path / "snipe.db")
|
|
||||||
from api.main import app
|
|
||||||
from fastapi.testclient import TestClient
|
|
||||||
return TestClient(app, raise_server_exceptions=False)
|
|
||||||
|
|
||||||
|
|
||||||
def _make_mock_listing(listing_id: str = "123456789", seller_id: str = "test_seller"):
|
|
||||||
"""Return a MagicMock listing (for use where asdict() is NOT called on it)."""
|
|
||||||
m = MagicMock()
|
|
||||||
m.platform_listing_id = listing_id
|
|
||||||
m.seller_platform_id = seller_id
|
|
||||||
m.title = "Test GPU"
|
|
||||||
m.price = 100.0
|
|
||||||
m.currency = "USD"
|
|
||||||
m.condition = "Used"
|
|
||||||
m.url = f"https://www.ebay.com/itm/{listing_id}"
|
|
||||||
m.photo_urls = []
|
|
||||||
m.listing_age_days = 5
|
|
||||||
m.buying_format = "fixed_price"
|
|
||||||
m.ends_at = None
|
|
||||||
m.fetched_at = None
|
|
||||||
m.trust_score_id = None
|
|
||||||
m.id = 1
|
|
||||||
m.category_name = None
|
|
||||||
return m
|
|
||||||
|
|
||||||
|
|
||||||
def _make_real_listing(listing_id: str = "123456789", seller_id: str = "test_seller"):
|
|
||||||
"""Return a real Listing dataclass instance (for use where asdict() is called)."""
|
|
||||||
from app.db.models import Listing
|
|
||||||
return Listing(
|
|
||||||
platform="ebay",
|
|
||||||
platform_listing_id=listing_id,
|
|
||||||
title="Test GPU",
|
|
||||||
price=100.0,
|
|
||||||
currency="USD",
|
|
||||||
condition="Used",
|
|
||||||
seller_platform_id=seller_id,
|
|
||||||
url=f"https://www.ebay.com/itm/{listing_id}",
|
|
||||||
photo_urls=[],
|
|
||||||
listing_age_days=5,
|
|
||||||
buying_format="fixed_price",
|
|
||||||
id=None,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ── _cache_key unit tests ─────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_cache_key_stable_for_same_inputs():
|
|
||||||
"""The same parameter set always produces the same key."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386")
|
|
||||||
k2 = _cache_key("rtx 3080", 400.0, 100.0, 2, "rtx,3080", "all", "mining", "27386")
|
|
||||||
assert k1 == k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_case_normalised():
|
|
||||||
"""Query is normalised to lower-case + stripped before hashing."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("RTX 3080", None, None, 1, "", "all", "", "")
|
|
||||||
k2 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
|
||||||
assert k1 == k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_differs_on_query_change():
|
|
||||||
"""Different query strings must produce different keys."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
|
||||||
k2 = _cache_key("gtx 1080", None, None, 1, "", "all", "", "")
|
|
||||||
assert k1 != k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_differs_on_price_filter():
|
|
||||||
"""Different max_price must produce a different key."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("gpu", 400.0, None, 1, "", "all", "", "")
|
|
||||||
k2 = _cache_key("gpu", 500.0, None, 1, "", "all", "", "")
|
|
||||||
assert k1 != k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_differs_on_min_price():
|
|
||||||
"""Different min_price must produce a different key."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("gpu", None, 50.0, 1, "", "all", "", "")
|
|
||||||
k2 = _cache_key("gpu", None, 100.0, 1, "", "all", "", "")
|
|
||||||
assert k1 != k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_differs_on_pages():
|
|
||||||
"""Different page count must produce a different key."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
|
||||||
k2 = _cache_key("gpu", None, None, 2, "", "all", "", "")
|
|
||||||
assert k1 != k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_differs_on_must_include():
|
|
||||||
"""Different must_include terms must produce a different key."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("gpu", None, None, 1, "rtx", "all", "", "")
|
|
||||||
k2 = _cache_key("gpu", None, None, 1, "gtx", "all", "", "")
|
|
||||||
assert k1 != k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_differs_on_must_exclude():
|
|
||||||
"""Different must_exclude terms must produce a different key."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("gpu", None, None, 1, "", "all", "mining", "")
|
|
||||||
k2 = _cache_key("gpu", None, None, 1, "", "all", "defective", "")
|
|
||||||
assert k1 != k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_differs_on_category_id():
|
|
||||||
"""Different category_id must produce a different key."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k1 = _cache_key("gpu", None, None, 1, "", "all", "", "27386")
|
|
||||||
k2 = _cache_key("gpu", None, None, 1, "", "all", "", "12345")
|
|
||||||
assert k1 != k2
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_key_is_16_chars():
|
|
||||||
"""Key must be exactly 16 hex characters."""
|
|
||||||
from api.main import _cache_key
|
|
||||||
k = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
|
||||||
assert len(k) == 16
|
|
||||||
assert all(c in "0123456789abcdef" for c in k)
|
|
||||||
|
|
||||||
|
|
||||||
# ── TTL / eviction unit tests ─────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_expired_entry_is_not_returned_as_hit():
|
|
||||||
"""An entry past its TTL must not be treated as a cache hit."""
|
|
||||||
import api.main as _main
|
|
||||||
from api.main import _cache_key
|
|
||||||
|
|
||||||
key = _cache_key("gpu", None, None, 1, "", "all", "", "")
|
|
||||||
# Write an already-expired entry.
|
|
||||||
_main._search_result_cache[key] = (
|
|
||||||
{"listings": [], "market_price": None},
|
|
||||||
time.time() - 1.0, # expired 1 second ago
|
|
||||||
)
|
|
||||||
|
|
||||||
cached = _main._search_result_cache.get(key)
|
|
||||||
assert cached is not None
|
|
||||||
payload, expiry = cached
|
|
||||||
# Simulate the hit-check used in main.py
|
|
||||||
assert expiry <= time.time(), "Entry should be expired"
|
|
||||||
|
|
||||||
|
|
||||||
def test_evict_expired_cache_removes_stale_entries():
|
|
||||||
"""_evict_expired_cache must remove entries whose expiry has passed."""
|
|
||||||
import api.main as _main
|
|
||||||
from api.main import _cache_key, _evict_expired_cache
|
|
||||||
|
|
||||||
key_expired = _cache_key("old query", None, None, 1, "", "all", "", "")
|
|
||||||
key_valid = _cache_key("new query", None, None, 1, "", "all", "", "")
|
|
||||||
|
|
||||||
_main._search_result_cache[key_expired] = (
|
|
||||||
{"listings": [], "market_price": None},
|
|
||||||
time.time() - 10.0, # already expired
|
|
||||||
)
|
|
||||||
_main._search_result_cache[key_valid] = (
|
|
||||||
{"listings": [], "market_price": 99.0},
|
|
||||||
time.time() + 300.0, # valid for 5 min
|
|
||||||
)
|
|
||||||
|
|
||||||
# Reset throttle so eviction runs immediately.
|
|
||||||
_main._last_eviction_ts = 0.0
|
|
||||||
_evict_expired_cache()
|
|
||||||
|
|
||||||
assert key_expired not in _main._search_result_cache
|
|
||||||
assert key_valid in _main._search_result_cache
|
|
||||||
|
|
||||||
|
|
||||||
def test_evict_is_rate_limited():
|
|
||||||
"""_evict_expired_cache should skip eviction if called within 60 s."""
|
|
||||||
import api.main as _main
|
|
||||||
from api.main import _cache_key, _evict_expired_cache
|
|
||||||
|
|
||||||
key_expired = _cache_key("stale", None, None, 1, "", "all", "", "")
|
|
||||||
_main._search_result_cache[key_expired] = (
|
|
||||||
{"listings": [], "market_price": None},
|
|
||||||
time.time() - 5.0,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Pretend eviction just ran.
|
|
||||||
_main._last_eviction_ts = time.time()
|
|
||||||
_evict_expired_cache()
|
|
||||||
|
|
||||||
# Entry should still be present because eviction was throttled.
|
|
||||||
assert key_expired in _main._search_result_cache
|
|
||||||
|
|
||||||
|
|
||||||
# ── Integration tests — async endpoint cache hit ──────────────────────────────
|
|
||||||
|
|
||||||
def test_async_cache_hit_skips_scraper(client, tmp_path):
|
|
||||||
"""On a warm cache hit the scraper adapter must not be called."""
|
|
||||||
import threading
|
|
||||||
import api.main as _main
|
|
||||||
from api.main import _cache_key
|
|
||||||
|
|
||||||
# Pre-seed a valid cache entry.
|
|
||||||
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
|
||||||
_main._search_result_cache[key] = (
|
|
||||||
{"listings": [], "market_price": 250.0},
|
|
||||||
time.time() + 300.0,
|
|
||||||
)
|
|
||||||
|
|
||||||
scraper_called = threading.Event()
|
|
||||||
|
|
||||||
def _fake_search(query, filters):
|
|
||||||
scraper_called.set()
|
|
||||||
return []
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
|
||||||
patch("api.main._trigger_scraper_enrichment"),
|
|
||||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
|
||||||
patch("api.main.Store") as mock_store_cls,
|
|
||||||
):
|
|
||||||
mock_adapter = MagicMock()
|
|
||||||
mock_adapter.search.side_effect = _fake_search
|
|
||||||
mock_adapter.get_completed_sales.return_value = None
|
|
||||||
mock_adapter_factory.return_value = mock_adapter
|
|
||||||
|
|
||||||
mock_scorer = MagicMock()
|
|
||||||
mock_scorer.score_batch.return_value = []
|
|
||||||
mock_scorer_cls.return_value = mock_scorer
|
|
||||||
|
|
||||||
mock_store = MagicMock()
|
|
||||||
mock_store.get_listings_staged.return_value = {}
|
|
||||||
mock_store.refresh_seller_categories.return_value = 0
|
|
||||||
mock_store.save_listings.return_value = None
|
|
||||||
mock_store.save_trust_scores.return_value = None
|
|
||||||
mock_store.get_market_comp.return_value = None
|
|
||||||
mock_store.get_seller.return_value = None
|
|
||||||
mock_store.get_user_preference.return_value = None
|
|
||||||
mock_store_cls.return_value = mock_store
|
|
||||||
|
|
||||||
resp = client.get("/api/search/async?q=rtx+3080")
|
|
||||||
assert resp.status_code == 202
|
|
||||||
|
|
||||||
# Give the background worker a moment to run.
|
|
||||||
scraper_called.wait(timeout=3.0)
|
|
||||||
|
|
||||||
# Scraper must NOT have been called on a cache hit.
|
|
||||||
assert not scraper_called.is_set(), "Scraper was called despite a warm cache hit"
|
|
||||||
|
|
||||||
|
|
||||||
def test_async_cache_miss_stores_result(client, tmp_path):
|
|
||||||
"""After a cache miss the result must be stored in _search_result_cache."""
|
|
||||||
import threading
|
|
||||||
import api.main as _main
|
|
||||||
from api.main import _cache_key
|
|
||||||
|
|
||||||
search_done = threading.Event()
|
|
||||||
real_listing = _make_real_listing()
|
|
||||||
|
|
||||||
def _fake_search(query, filters):
|
|
||||||
return [real_listing]
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
|
||||||
patch("api.main._trigger_scraper_enrichment") as mock_enrich,
|
|
||||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
|
||||||
patch("api.main.Store") as mock_store_cls,
|
|
||||||
):
|
|
||||||
mock_adapter = MagicMock()
|
|
||||||
mock_adapter.search.side_effect = _fake_search
|
|
||||||
mock_adapter.get_completed_sales.return_value = None
|
|
||||||
mock_adapter_factory.return_value = mock_adapter
|
|
||||||
|
|
||||||
mock_scorer = MagicMock()
|
|
||||||
mock_scorer.score_batch.return_value = []
|
|
||||||
mock_scorer_cls.return_value = mock_scorer
|
|
||||||
|
|
||||||
mock_store = MagicMock()
|
|
||||||
mock_store.get_listings_staged.return_value = {
|
|
||||||
real_listing.platform_listing_id: real_listing
|
|
||||||
}
|
|
||||||
mock_store.refresh_seller_categories.return_value = 0
|
|
||||||
mock_store.save_listings.return_value = None
|
|
||||||
mock_store.save_trust_scores.return_value = None
|
|
||||||
mock_store.get_market_comp.return_value = None
|
|
||||||
mock_store.get_seller.return_value = None
|
|
||||||
mock_store.get_user_preference.return_value = None
|
|
||||||
mock_store_cls.return_value = mock_store
|
|
||||||
|
|
||||||
def _enrich_side_effect(*args, **kwargs):
|
|
||||||
search_done.set()
|
|
||||||
|
|
||||||
mock_enrich.side_effect = _enrich_side_effect
|
|
||||||
|
|
||||||
resp = client.get("/api/search/async?q=rtx+3080")
|
|
||||||
assert resp.status_code == 202
|
|
||||||
|
|
||||||
# Wait until the background worker reaches _trigger_scraper_enrichment.
|
|
||||||
search_done.wait(timeout=5.0)
|
|
||||||
|
|
||||||
assert search_done.is_set(), "Background search worker never completed"
|
|
||||||
|
|
||||||
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
|
||||||
assert key in _main._search_result_cache, "Result was not stored in cache after miss"
|
|
||||||
payload, expiry = _main._search_result_cache[key]
|
|
||||||
assert expiry > time.time(), "Cache entry has already expired"
|
|
||||||
assert "listings" in payload
|
|
||||||
|
|
||||||
|
|
||||||
# ── Integration tests — async endpoint refresh=True ──────────────────────────
|
|
||||||
|
|
||||||
def test_async_refresh_bypasses_cache_read(client, tmp_path):
|
|
||||||
"""refresh=True must bypass cache read and invoke the scraper."""
|
|
||||||
import threading
|
|
||||||
import api.main as _main
|
|
||||||
from api.main import _cache_key
|
|
||||||
|
|
||||||
# Seed a valid cache entry so we can confirm it is bypassed.
|
|
||||||
key = _cache_key("rtx 3080", None, None, 1, "", "all", "", "")
|
|
||||||
_main._search_result_cache[key] = (
|
|
||||||
{"listings": [], "market_price": 100.0},
|
|
||||||
time.time() + 300.0,
|
|
||||||
)
|
|
||||||
|
|
||||||
scraper_called = threading.Event()
|
|
||||||
|
|
||||||
def _fake_search(query, filters):
|
|
||||||
scraper_called.set()
|
|
||||||
return []
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
|
||||||
patch("api.main._trigger_scraper_enrichment"),
|
|
||||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
|
||||||
patch("api.main.Store") as mock_store_cls,
|
|
||||||
):
|
|
||||||
mock_adapter = MagicMock()
|
|
||||||
mock_adapter.search.side_effect = _fake_search
|
|
||||||
mock_adapter.get_completed_sales.return_value = None
|
|
||||||
mock_adapter_factory.return_value = mock_adapter
|
|
||||||
|
|
||||||
mock_scorer = MagicMock()
|
|
||||||
mock_scorer.score_batch.return_value = []
|
|
||||||
mock_scorer_cls.return_value = mock_scorer
|
|
||||||
|
|
||||||
mock_store = MagicMock()
|
|
||||||
mock_store.get_listings_staged.return_value = {}
|
|
||||||
mock_store.refresh_seller_categories.return_value = 0
|
|
||||||
mock_store.save_listings.return_value = None
|
|
||||||
mock_store.save_trust_scores.return_value = None
|
|
||||||
mock_store.get_market_comp.return_value = None
|
|
||||||
mock_store.get_seller.return_value = None
|
|
||||||
mock_store.get_user_preference.return_value = None
|
|
||||||
mock_store_cls.return_value = mock_store
|
|
||||||
|
|
||||||
resp = client.get("/api/search/async?q=rtx+3080&refresh=true")
|
|
||||||
assert resp.status_code == 202
|
|
||||||
|
|
||||||
scraper_called.wait(timeout=5.0)
|
|
||||||
|
|
||||||
assert scraper_called.is_set(), "Scraper was not called even though refresh=True"
|
|
||||||
Loading…
Reference in a new issue