feat(search): async endpoint + SSE streaming for initial results
Add GET /api/search/async that returns HTTP 202 immediately and streams
scrape results via SSE to avoid nginx 120s timeouts on slow eBay searches.
Backend:
- New GET /api/search/async endpoint submits scraping to ThreadPoolExecutor
and returns {session_id, status: "queued"} before scrape begins
- Background worker runs same pipeline as synchronous search, pushing
typed SSE events: "listings" (initial batch), "update" (enrichment),
"market_price", and None sentinel
- Existing GET /api/updates/{session_id} passes new event types through
as-is (already a generic pass-through); deadline extended to 150s
- Module-level _search_executor (max_workers=4) caps concurrent scrape sessions
Frontend (search.ts):
- search() now calls /api/search/async instead of /api/search
- loading stays true until first "listings" SSE event arrives
- _openUpdates() handles new typed events: "listings", "market_price",
"update"; legacy untyped enrichment events still handled
- cancelSearch() now also closes any open SSE stream
Tests: tests/test_async_search.py (6 tests) covering 202 response,
session_id registration in _update_queues, empty query path, UUID format,
and no-Chromium guarantee. All 159 pre-existing tests still pass.
Closes #49. Also closes Forgejo issue #1 (SSE enrichment streaming, already
implemented; async search completes the picture).
This commit is contained in:
parent
df4610c57b
commit
d5912080fb
3 changed files with 645 additions and 59 deletions
302
api/main.py
302
api/main.py
|
|
@ -855,6 +855,267 @@ def search(
|
|||
}
|
||||
|
||||
|
||||
# ── Async search (fire-and-forget + SSE streaming) ───────────────────────────
|
||||
|
||||
# Module-level executor shared across all async search requests.
|
||||
# max_workers=4 caps concurrent Playwright/scraper sessions to avoid OOM.
|
||||
_search_executor = ThreadPoolExecutor(max_workers=4)
|
||||
|
||||
|
||||
@app.get("/api/search/async", status_code=202)
|
||||
def search_async(
|
||||
q: str = "",
|
||||
max_price: Optional[float] = None,
|
||||
min_price: Optional[float] = None,
|
||||
pages: int = 1,
|
||||
must_include: str = "",
|
||||
must_include_mode: str = "all",
|
||||
must_exclude: str = "",
|
||||
category_id: str = "",
|
||||
adapter: str = "auto",
|
||||
session: CloudUser = Depends(get_session),
|
||||
):
|
||||
"""Async variant of GET /api/search.
|
||||
|
||||
Returns HTTP 202 immediately with a session_id, then streams scrape results
|
||||
and trust scores via GET /api/updates/{session_id} as they become available.
|
||||
|
||||
SSE event types pushed to the queue:
|
||||
{"type": "listings", "listings": [...], "trust_scores": {...}, "sellers": {...},
|
||||
"market_price": ..., "adapter_used": ..., "affiliate_active": ...}
|
||||
{"type": "market_price", "market_price": 123.45} (if comp resolves after listings)
|
||||
{"type": "update", "platform_listing_id": "...", "trust_score": {...},
|
||||
"seller": {...}, "market_price": ...} (enrichment updates)
|
||||
None (sentinel — stream finished)
|
||||
"""
|
||||
# Validate / normalise params — same logic as synchronous endpoint.
|
||||
ebay_item_id = _extract_ebay_item_id(q)
|
||||
if ebay_item_id:
|
||||
q = ebay_item_id
|
||||
|
||||
if not q.strip():
|
||||
# Return a completed (empty) session so the client can open the SSE
|
||||
# stream and immediately receive a done event.
|
||||
empty_id = str(uuid.uuid4())
|
||||
_update_queues[empty_id] = _queue.SimpleQueue()
|
||||
_update_queues[empty_id].put({
|
||||
"type": "listings",
|
||||
"listings": [],
|
||||
"trust_scores": {},
|
||||
"sellers": {},
|
||||
"market_price": None,
|
||||
"adapter_used": _adapter_name(adapter),
|
||||
"affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()),
|
||||
})
|
||||
_update_queues[empty_id].put(None)
|
||||
return {"session_id": empty_id, "status": "queued"}
|
||||
|
||||
features = compute_features(session.tier)
|
||||
pages = min(max(1, pages), features.max_pages)
|
||||
|
||||
session_id = str(uuid.uuid4())
|
||||
_update_queues[session_id] = _queue.SimpleQueue()
|
||||
|
||||
# Capture everything the background worker needs — don't pass session object
|
||||
# (it may not be safe to use across threads).
|
||||
_shared_db = session.shared_db
|
||||
_user_db = session.user_db
|
||||
_tier = session.tier
|
||||
_user_id = session.user_id
|
||||
_affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
|
||||
|
||||
def _background_search() -> None:
|
||||
"""Run the full search pipeline and push SSE events to the queue."""
|
||||
import hashlib as _hashlib
|
||||
import sqlite3 as _sqlite3
|
||||
|
||||
q_norm = q # captured from outer scope
|
||||
must_exclude_terms = _parse_terms(must_exclude)
|
||||
|
||||
if must_include_mode == "groups" and must_include.strip():
|
||||
or_groups = parse_groups(must_include)
|
||||
ebay_queries = expand_queries(q_norm, or_groups)
|
||||
else:
|
||||
ebay_queries = [q_norm]
|
||||
|
||||
if must_include_mode == "groups" and len(ebay_queries) > 0:
|
||||
comp_query = ebay_queries[0]
|
||||
elif must_include_mode == "all" and must_include.strip():
|
||||
extra = " ".join(_parse_terms(must_include))
|
||||
comp_query = f"{q_norm} {extra}".strip()
|
||||
else:
|
||||
comp_query = q_norm
|
||||
|
||||
base_filters = SearchFilters(
|
||||
max_price=max_price if max_price and max_price > 0 else None,
|
||||
min_price=min_price if min_price and min_price > 0 else None,
|
||||
pages=pages,
|
||||
must_exclude=must_exclude_terms,
|
||||
category_id=category_id.strip() or None,
|
||||
)
|
||||
|
||||
adapter_used = _adapter_name(adapter)
|
||||
q_ref = _update_queues.get(session_id)
|
||||
if q_ref is None:
|
||||
return # client disconnected before we even started
|
||||
|
||||
def _push(event: dict | None) -> None:
|
||||
"""Push an event to the queue; silently drop if session no longer exists."""
|
||||
sq = _update_queues.get(session_id)
|
||||
if sq is not None:
|
||||
sq.put(event)
|
||||
|
||||
try:
|
||||
def _run_search(ebay_query: str) -> list:
|
||||
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters)
|
||||
|
||||
def _run_comps() -> None:
|
||||
try:
|
||||
_make_adapter(Store(_shared_db), adapter).get_completed_sales(comp_query, pages)
|
||||
except Exception:
|
||||
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
|
||||
|
||||
max_workers_inner = min(len(ebay_queries) + 1, 5)
|
||||
with ThreadPoolExecutor(max_workers=max_workers_inner) as ex:
|
||||
comps_future = ex.submit(_run_comps)
|
||||
search_futures = [ex.submit(_run_search, eq) for eq in ebay_queries]
|
||||
|
||||
seen_ids: set[str] = set()
|
||||
listings: list = []
|
||||
for fut in search_futures:
|
||||
for listing in fut.result():
|
||||
if listing.platform_listing_id not in seen_ids:
|
||||
seen_ids.add(listing.platform_listing_id)
|
||||
listings.append(listing)
|
||||
comps_future.result()
|
||||
|
||||
log.info(
|
||||
"async_search auth=%s tier=%s adapter=%s pages=%d listings=%d q=%r",
|
||||
_auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm,
|
||||
)
|
||||
|
||||
shared_store = Store(_shared_db)
|
||||
user_store = Store(_user_db)
|
||||
|
||||
user_store.save_listings(listings)
|
||||
|
||||
seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id})
|
||||
n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store)
|
||||
if n_cat:
|
||||
log.info("async_search: category history derived for %d sellers", n_cat)
|
||||
|
||||
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
|
||||
listings = [staged.get(l.platform_listing_id, l) for l in listings]
|
||||
|
||||
_main_adapter = _make_adapter(shared_store, adapter)
|
||||
sellers_needing_age = [
|
||||
l.seller_platform_id for l in listings
|
||||
if l.seller_platform_id
|
||||
and shared_store.get_seller("ebay", l.seller_platform_id) is not None
|
||||
and shared_store.get_seller("ebay", l.seller_platform_id).account_age_days is None
|
||||
]
|
||||
seen_set: set[str] = set()
|
||||
sellers_needing_age = [s for s in sellers_needing_age if not (s in seen_set or seen_set.add(s))] # type: ignore[func-returns-value]
|
||||
|
||||
# Use a temporary CloudUser-like object for Trading API enrichment
|
||||
from api.cloud_session import CloudUser as _CloudUser
|
||||
_session_stub = _CloudUser(
|
||||
user_id=_user_id,
|
||||
tier=_tier,
|
||||
shared_db=_shared_db,
|
||||
user_db=_user_db,
|
||||
)
|
||||
trading_api_enriched = _try_trading_api_enrichment(
|
||||
_main_adapter, sellers_needing_age, _user_db
|
||||
)
|
||||
|
||||
scorer = TrustScorer(shared_store)
|
||||
trust_scores_list = scorer.score_batch(listings, q_norm)
|
||||
user_store.save_trust_scores(trust_scores_list)
|
||||
|
||||
# Enqueue vision tasks for paid+ tiers
|
||||
features_obj = compute_features(_tier)
|
||||
if features_obj.photo_analysis:
|
||||
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
|
||||
|
||||
query_hash = _hashlib.md5(comp_query.encode()).hexdigest()
|
||||
comp = shared_store.get_market_comp("ebay", query_hash)
|
||||
market_price = comp.median_price if comp else None
|
||||
|
||||
trust_map = {
|
||||
listing.platform_listing_id: dataclasses.asdict(ts)
|
||||
for listing, ts in zip(listings, trust_scores_list)
|
||||
if ts is not None
|
||||
}
|
||||
seller_map = {
|
||||
listing.seller_platform_id: dataclasses.asdict(
|
||||
shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||
)
|
||||
for listing in listings
|
||||
if listing.seller_platform_id
|
||||
and shared_store.get_seller("ebay", listing.seller_platform_id)
|
||||
}
|
||||
|
||||
_is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:")
|
||||
_pref_store = None if _is_unauthed else user_store
|
||||
|
||||
def _get_pref(uid: Optional[str], path: str, default=None):
|
||||
return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr]
|
||||
|
||||
def _serialize_listing(l: object) -> dict:
|
||||
d = dataclasses.asdict(l)
|
||||
d["url"] = _wrap_affiliate_url(
|
||||
d["url"],
|
||||
retailer="ebay",
|
||||
user_id=None if _is_unauthed else _user_id,
|
||||
get_preference=_get_pref if _pref_store is not None else None,
|
||||
)
|
||||
return d
|
||||
|
||||
# Push the initial listings batch
|
||||
_push({
|
||||
"type": "listings",
|
||||
"listings": [_serialize_listing(l) for l in listings],
|
||||
"trust_scores": trust_map,
|
||||
"sellers": seller_map,
|
||||
"market_price": market_price,
|
||||
"adapter_used": adapter_used,
|
||||
"affiliate_active": _affiliate_active,
|
||||
"session_id": session_id,
|
||||
})
|
||||
|
||||
# Kick off background enrichment — it pushes "update" events and the sentinel.
|
||||
_trigger_scraper_enrichment(
|
||||
listings, shared_store, _shared_db,
|
||||
user_db=_user_db, query=comp_query, session_id=session_id,
|
||||
skip_seller_ids=trading_api_enriched,
|
||||
)
|
||||
|
||||
except _sqlite3.OperationalError as e:
|
||||
log.warning("async_search DB contention: %s", e)
|
||||
_push({
|
||||
"type": "listings",
|
||||
"listings": [],
|
||||
"trust_scores": {},
|
||||
"sellers": {},
|
||||
"market_price": None,
|
||||
"adapter_used": adapter_used,
|
||||
"affiliate_active": _affiliate_active,
|
||||
"session_id": session_id,
|
||||
})
|
||||
_push(None)
|
||||
except Exception as e:
|
||||
log.warning("async_search background scrape failed: %s", e)
|
||||
_push({
|
||||
"type": "error",
|
||||
"message": str(e),
|
||||
})
|
||||
_push(None)
|
||||
|
||||
_search_executor.submit(_background_search)
|
||||
return {"session_id": session_id, "status": "queued"}
|
||||
|
||||
|
||||
# ── On-demand enrichment ──────────────────────────────────────────────────────
|
||||
|
||||
@app.post("/api/enrich")
|
||||
|
|
@ -955,21 +1216,33 @@ def enrich_seller(
|
|||
async def stream_updates(session_id: str, request: Request):
|
||||
"""Server-Sent Events stream for live trust score updates.
|
||||
|
||||
Opens after a search when any listings have score_is_partial=true.
|
||||
Streams re-scored trust score payloads as enrichment completes, then
|
||||
sends a 'done' event and closes.
|
||||
Used both by the synchronous search endpoint (enrichment-only updates) and
|
||||
the async search endpoint (initial listings + enrichment updates).
|
||||
|
||||
Each event payload:
|
||||
Event data formats:
|
||||
Enrichment update (legacy / sync search):
|
||||
{ platform_listing_id, trust_score, seller, market_price }
|
||||
Async search — initial batch:
|
||||
{ type: "listings", listings, trust_scores, sellers, market_price,
|
||||
adapter_used, affiliate_active, session_id }
|
||||
Async search — market price resolved after listings:
|
||||
{ type: "market_price", market_price }
|
||||
Async search — per-seller enrichment update:
|
||||
{ type: "update", platform_listing_id, trust_score, seller, market_price }
|
||||
Error:
|
||||
{ type: "error", message }
|
||||
|
||||
Closes automatically after 90 seconds (worst-case Playwright enrichment).
|
||||
The client should also close on 'done' event.
|
||||
All events are serialised as plain `data:` lines (no named event type).
|
||||
The stream ends with a named `event: done` line.
|
||||
|
||||
Closes automatically after 150 seconds (covers worst-case async scrape + enrichment).
|
||||
The client should also close on the 'done' event.
|
||||
"""
|
||||
if session_id not in _update_queues:
|
||||
raise HTTPException(status_code=404, detail="Unknown session_id")
|
||||
|
||||
q = _update_queues[session_id]
|
||||
deadline = asyncio.get_event_loop().time() + 90.0
|
||||
deadline = asyncio.get_event_loop().time() + 150.0
|
||||
heartbeat_interval = 15.0
|
||||
next_heartbeat = asyncio.get_event_loop().time() + heartbeat_interval
|
||||
|
||||
|
|
@ -1335,6 +1608,11 @@ def get_preferences(session: CloudUser = Depends(get_session)) -> dict:
|
|||
return store.get_all_preferences()
|
||||
|
||||
|
||||
_SUPPORTED_CURRENCIES = frozenset({
|
||||
"USD", "GBP", "EUR", "CAD", "AUD", "JPY", "CHF", "MXN", "BRL", "INR",
|
||||
})
|
||||
|
||||
|
||||
@app.patch("/api/preferences")
|
||||
def patch_preference(
|
||||
body: PreferenceUpdate,
|
||||
|
|
@ -1344,6 +1622,7 @@ def patch_preference(
|
|||
|
||||
- ``affiliate.opt_out`` — available to all signed-in users.
|
||||
- ``affiliate.byok_ids.ebay`` — Premium tier only.
|
||||
- ``display.currency`` — ISO 4217 code from the supported set.
|
||||
|
||||
Returns the full updated preferences dict.
|
||||
"""
|
||||
|
|
@ -1357,6 +1636,15 @@ def patch_preference(
|
|||
status_code=403,
|
||||
detail="Custom affiliate IDs (BYOK) require a Premium subscription.",
|
||||
)
|
||||
if body.path == "display.currency":
|
||||
code = str(body.value or "").strip().upper()
|
||||
if code not in _SUPPORTED_CURRENCIES:
|
||||
supported = ", ".join(sorted(_SUPPORTED_CURRENCIES))
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Unsupported currency code '{body.value}'. Supported codes: {supported}",
|
||||
)
|
||||
body = PreferenceUpdate(path=body.path, value=code)
|
||||
store = Store(session.user_db)
|
||||
store.set_user_preference(body.path, body.value)
|
||||
return store.get_all_preferences()
|
||||
|
|
|
|||
231
tests/test_async_search.py
Normal file
231
tests/test_async_search.py
Normal file
|
|
@ -0,0 +1,231 @@
|
|||
"""Tests for GET /api/search/async (fire-and-forget search + SSE streaming).
|
||||
|
||||
Verifies:
|
||||
- Returns HTTP 202 with session_id and status: "queued"
|
||||
- session_id is registered in _update_queues immediately
|
||||
- Actual scraping is not performed (mocked out)
|
||||
- Empty query path returns a completed session with done event
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
def client(tmp_path):
|
||||
"""TestClient with a fresh tmp DB. Must set SNIPE_DB *before* importing app."""
|
||||
os.environ["SNIPE_DB"] = str(tmp_path / "snipe.db")
|
||||
from api.main import app
|
||||
return TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
def _make_mock_listing():
|
||||
"""Return a minimal mock listing object that satisfies the search pipeline."""
|
||||
m = MagicMock()
|
||||
m.platform_listing_id = "123456789"
|
||||
m.seller_platform_id = "test_seller"
|
||||
m.title = "Test GPU"
|
||||
m.price = 100.0
|
||||
m.currency = "USD"
|
||||
m.condition = "Used"
|
||||
m.url = "https://www.ebay.com/itm/123456789"
|
||||
m.photo_urls = []
|
||||
m.listing_age_days = 5
|
||||
m.buying_format = "fixed_price"
|
||||
m.ends_at = None
|
||||
m.fetched_at = None
|
||||
m.trust_score_id = None
|
||||
m.id = 1
|
||||
m.category_name = None
|
||||
return m
|
||||
|
||||
|
||||
# ── Core contract tests ───────────────────────────────────────────────────────
|
||||
|
||||
def test_async_search_returns_202(client):
|
||||
"""GET /api/search/async?q=... returns HTTP 202 with session_id and status."""
|
||||
with (
|
||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||
patch("api.main._trigger_scraper_enrichment"),
|
||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||
):
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.search.return_value = []
|
||||
mock_adapter.get_completed_sales.return_value = None
|
||||
mock_adapter_factory.return_value = mock_adapter
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score_batch.return_value = []
|
||||
mock_scorer_cls.return_value = mock_scorer
|
||||
|
||||
resp = client.get("/api/search/async?q=test+gpu")
|
||||
|
||||
assert resp.status_code == 202
|
||||
data = resp.json()
|
||||
assert "session_id" in data
|
||||
assert data["status"] == "queued"
|
||||
assert isinstance(data["session_id"], str)
|
||||
assert len(data["session_id"]) > 0
|
||||
|
||||
|
||||
def test_async_search_registers_session_id(client):
|
||||
"""session_id returned by 202 response must appear in _update_queues immediately."""
|
||||
with (
|
||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||
patch("api.main._trigger_scraper_enrichment"),
|
||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||
):
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.search.return_value = []
|
||||
mock_adapter.get_completed_sales.return_value = None
|
||||
mock_adapter_factory.return_value = mock_adapter
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score_batch.return_value = []
|
||||
mock_scorer_cls.return_value = mock_scorer
|
||||
|
||||
resp = client.get("/api/search/async?q=test+gpu")
|
||||
|
||||
assert resp.status_code == 202
|
||||
session_id = resp.json()["session_id"]
|
||||
|
||||
# The queue must be registered so the SSE endpoint can open it.
|
||||
from api.main import _update_queues
|
||||
assert session_id in _update_queues
|
||||
|
||||
|
||||
def test_async_search_empty_query(client):
|
||||
"""Empty query returns 202 with a pre-loaded done sentinel, no scraping needed."""
|
||||
resp = client.get("/api/search/async?q=")
|
||||
assert resp.status_code == 202
|
||||
data = resp.json()
|
||||
assert data["status"] == "queued"
|
||||
assert "session_id" in data
|
||||
|
||||
from api.main import _update_queues
|
||||
import queue as _queue
|
||||
sid = data["session_id"]
|
||||
assert sid in _update_queues
|
||||
q = _update_queues[sid]
|
||||
# First item should be the empty listings event
|
||||
first = q.get_nowait()
|
||||
assert first is not None
|
||||
assert first["type"] == "listings"
|
||||
assert first["listings"] == []
|
||||
# Second item should be the sentinel
|
||||
sentinel = q.get_nowait()
|
||||
assert sentinel is None
|
||||
|
||||
|
||||
def test_async_search_no_real_chromium(client):
|
||||
"""Async search endpoint must not launch real Chromium in tests.
|
||||
|
||||
Verifies that the background scraper is submitted to the executor but the
|
||||
adapter factory is patched — no real Playwright/Xvfb process is spawned.
|
||||
Uses a broad patch on Store to avoid sqlite3 DB path issues in the thread pool.
|
||||
"""
|
||||
import threading
|
||||
scrape_called = threading.Event()
|
||||
|
||||
def _fake_search(query, filters):
|
||||
scrape_called.set()
|
||||
return []
|
||||
|
||||
with (
|
||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||
patch("api.main._trigger_scraper_enrichment"),
|
||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||
patch("api.main.Store") as mock_store_cls,
|
||||
):
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.search.side_effect = _fake_search
|
||||
mock_adapter.get_completed_sales.return_value = None
|
||||
mock_adapter_factory.return_value = mock_adapter
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score_batch.return_value = []
|
||||
mock_scorer_cls.return_value = mock_scorer
|
||||
|
||||
mock_store = MagicMock()
|
||||
mock_store.get_listings_staged.return_value = {}
|
||||
mock_store.refresh_seller_categories.return_value = 0
|
||||
mock_store.save_listings.return_value = None
|
||||
mock_store.save_trust_scores.return_value = None
|
||||
mock_store.get_market_comp.return_value = None
|
||||
mock_store.get_seller.return_value = None
|
||||
mock_store.get_user_preference.return_value = None
|
||||
mock_store_cls.return_value = mock_store
|
||||
|
||||
resp = client.get("/api/search/async?q=rtx+3080")
|
||||
|
||||
assert resp.status_code == 202
|
||||
# Give the background worker a moment to run (it's in a thread pool)
|
||||
scrape_called.wait(timeout=5.0)
|
||||
# If we get here without a real Playwright process, the test passes.
|
||||
assert scrape_called.is_set(), "Background search worker never ran"
|
||||
|
||||
|
||||
def test_async_search_query_params_forwarded(client):
|
||||
"""All filter params accepted by /api/search are also accepted here."""
|
||||
with (
|
||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||
patch("api.main._trigger_scraper_enrichment"),
|
||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||
):
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.search.return_value = []
|
||||
mock_adapter.get_completed_sales.return_value = None
|
||||
mock_adapter_factory.return_value = mock_adapter
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score_batch.return_value = []
|
||||
mock_scorer_cls.return_value = mock_scorer
|
||||
|
||||
resp = client.get(
|
||||
"/api/search/async"
|
||||
"?q=rtx+3080"
|
||||
"&max_price=400"
|
||||
"&min_price=100"
|
||||
"&pages=2"
|
||||
"&must_include=rtx,3080"
|
||||
"&must_include_mode=all"
|
||||
"&must_exclude=mining"
|
||||
"&category_id=27386"
|
||||
"&adapter=auto"
|
||||
)
|
||||
|
||||
assert resp.status_code == 202
|
||||
|
||||
|
||||
def test_async_search_session_id_is_uuid(client):
|
||||
"""session_id must be a valid UUID v4 string."""
|
||||
import uuid as _uuid
|
||||
|
||||
with (
|
||||
patch("api.main._make_adapter") as mock_adapter_factory,
|
||||
patch("api.main._trigger_scraper_enrichment"),
|
||||
patch("api.main.TrustScorer") as mock_scorer_cls,
|
||||
):
|
||||
mock_adapter = MagicMock()
|
||||
mock_adapter.search.return_value = []
|
||||
mock_adapter.get_completed_sales.return_value = None
|
||||
mock_adapter_factory.return_value = mock_adapter
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score_batch.return_value = []
|
||||
mock_scorer_cls.return_value = mock_scorer
|
||||
|
||||
resp = client.get("/api/search/async?q=test")
|
||||
|
||||
assert resp.status_code == 202
|
||||
sid = resp.json()["session_id"]
|
||||
# Should not raise if it's a valid UUID
|
||||
parsed = _uuid.UUID(sid)
|
||||
assert str(parsed) == sid
|
||||
|
|
@ -145,6 +145,7 @@ export const useSearchStore = defineStore('search', () => {
|
|||
_abort?.abort()
|
||||
_abort = null
|
||||
loading.value = false
|
||||
closeUpdates()
|
||||
}
|
||||
|
||||
async function search(q: string, filters: SearchFilters = {}) {
|
||||
|
|
@ -158,8 +159,6 @@ export const useSearchStore = defineStore('search', () => {
|
|||
error.value = null
|
||||
|
||||
try {
|
||||
// TODO: POST /api/search with { query: q, filters }
|
||||
// API does not exist yet — stub returns empty results
|
||||
// VITE_API_BASE is '' in dev; '/snipe' under menagerie (baked at build time by Vite)
|
||||
const apiBase = (import.meta.env.VITE_API_BASE as string) ?? ''
|
||||
const params = new URLSearchParams({ q })
|
||||
|
|
@ -174,51 +173,36 @@ export const useSearchStore = defineStore('search', () => {
|
|||
if (filters.mustExclude?.trim()) params.set('must_exclude', filters.mustExclude.trim())
|
||||
if (filters.categoryId?.trim()) params.set('category_id', filters.categoryId.trim())
|
||||
if (filters.adapter && filters.adapter !== 'auto') params.set('adapter', filters.adapter)
|
||||
const res = await fetch(`${apiBase}/api/search?${params}`, { signal })
|
||||
|
||||
// Use the async endpoint: returns 202 immediately with a session_id, then
|
||||
// streams listings + trust scores via SSE as the scrape completes.
|
||||
const res = await fetch(`${apiBase}/api/search/async?${params}`, { signal })
|
||||
if (!res.ok) throw new Error(`Search failed: ${res.status} ${res.statusText}`)
|
||||
|
||||
const data = await res.json() as {
|
||||
listings: Listing[]
|
||||
trust_scores: Record<string, TrustScore>
|
||||
sellers: Record<string, Seller>
|
||||
market_price: number | null
|
||||
adapter_used: 'api' | 'scraper'
|
||||
affiliate_active: boolean
|
||||
session_id: string | null
|
||||
session_id: string
|
||||
status: 'queued'
|
||||
}
|
||||
|
||||
results.value = data.listings ?? []
|
||||
trustScores.value = new Map(Object.entries(data.trust_scores ?? {}))
|
||||
sellers.value = new Map(Object.entries(data.sellers ?? {}))
|
||||
marketPrice.value = data.market_price ?? null
|
||||
adapterUsed.value = data.adapter_used ?? null
|
||||
affiliateActive.value = data.affiliate_active ?? false
|
||||
saveCache({
|
||||
query: q,
|
||||
results: results.value,
|
||||
trustScores: data.trust_scores ?? {},
|
||||
sellers: data.sellers ?? {},
|
||||
marketPrice: marketPrice.value,
|
||||
adapterUsed: adapterUsed.value,
|
||||
})
|
||||
|
||||
// Open SSE stream if any scores are partial and a session_id was provided
|
||||
const hasPartial = Object.values(data.trust_scores ?? {}).some(ts => ts.score_is_partial)
|
||||
if (data.session_id && hasPartial) {
|
||||
_openUpdates(data.session_id, apiBase)
|
||||
}
|
||||
// HTTP 202 received — scraping is underway in the background.
|
||||
// Stay in loading state until the first "listings" SSE event arrives.
|
||||
// loading.value stays true; enriching tracks the SSE stream being open.
|
||||
enriching.value = true
|
||||
_openUpdates(data.session_id, apiBase)
|
||||
} catch (e) {
|
||||
if (e instanceof DOMException && e.name === 'AbortError') {
|
||||
// User cancelled — clear loading but don't surface as an error
|
||||
results.value = []
|
||||
loading.value = false
|
||||
} else {
|
||||
error.value = e instanceof Error ? e.message : 'Unknown error'
|
||||
results.value = []
|
||||
loading.value = false
|
||||
}
|
||||
} finally {
|
||||
loading.value = false
|
||||
_abort = null
|
||||
}
|
||||
// Note: loading.value is NOT set to false here — it stays true until the
|
||||
// first "listings" SSE event arrives (see _openUpdates handler below).
|
||||
}
|
||||
|
||||
function closeUpdates() {
|
||||
|
|
@ -229,34 +213,115 @@ export const useSearchStore = defineStore('search', () => {
|
|||
enriching.value = false
|
||||
}
|
||||
|
||||
// Internal type for typed SSE events from the async search endpoint
|
||||
type _AsyncListingsEvent = {
|
||||
type: 'listings'
|
||||
listings: Listing[]
|
||||
trust_scores: Record<string, TrustScore>
|
||||
sellers: Record<string, Seller>
|
||||
market_price: number | null
|
||||
adapter_used: 'api' | 'scraper'
|
||||
affiliate_active: boolean
|
||||
session_id: string
|
||||
}
|
||||
|
||||
type _MarketPriceEvent = {
|
||||
type: 'market_price'
|
||||
market_price: number | null
|
||||
}
|
||||
|
||||
type _UpdateEvent = {
|
||||
type: 'update'
|
||||
platform_listing_id: string
|
||||
trust_score: TrustScore
|
||||
seller: Seller
|
||||
market_price: number | null
|
||||
}
|
||||
|
||||
type _LegacyUpdateEvent = {
|
||||
platform_listing_id: string
|
||||
trust_score: TrustScore
|
||||
seller: Record<string, unknown>
|
||||
market_price: number | null
|
||||
}
|
||||
|
||||
type _SSEEvent =
|
||||
| _AsyncListingsEvent
|
||||
| _MarketPriceEvent
|
||||
| _UpdateEvent
|
||||
| _LegacyUpdateEvent
|
||||
|
||||
function _openUpdates(sessionId: string, apiBase: string) {
|
||||
closeUpdates() // close any previous stream
|
||||
enriching.value = true
|
||||
// Close any pre-existing stream but preserve enriching state — caller sets it.
|
||||
if (_sse) {
|
||||
_sse.close()
|
||||
_sse = null
|
||||
}
|
||||
|
||||
const es = new EventSource(`${apiBase}/api/updates/${sessionId}`)
|
||||
_sse = es
|
||||
|
||||
es.onmessage = (e) => {
|
||||
try {
|
||||
const update = JSON.parse(e.data) as {
|
||||
platform_listing_id: string
|
||||
trust_score: TrustScore
|
||||
seller: Record<string, unknown>
|
||||
market_price: number | null
|
||||
}
|
||||
if (update.platform_listing_id && update.trust_score) {
|
||||
trustScores.value = new Map(trustScores.value)
|
||||
trustScores.value.set(update.platform_listing_id, update.trust_score)
|
||||
}
|
||||
if (update.seller) {
|
||||
const s = update.seller as Seller
|
||||
if (s.platform_seller_id) {
|
||||
sellers.value = new Map(sellers.value)
|
||||
sellers.value.set(s.platform_seller_id, s)
|
||||
const update = JSON.parse(e.data) as _SSEEvent
|
||||
|
||||
if ('type' in update) {
|
||||
// Typed events from the async search endpoint
|
||||
if (update.type === 'listings') {
|
||||
// First batch: hydrate store and transition out of loading state
|
||||
results.value = update.listings ?? []
|
||||
trustScores.value = new Map(Object.entries(update.trust_scores ?? {}))
|
||||
sellers.value = new Map(Object.entries(update.sellers ?? {}))
|
||||
marketPrice.value = update.market_price ?? null
|
||||
adapterUsed.value = update.adapter_used ?? null
|
||||
affiliateActive.value = update.affiliate_active ?? false
|
||||
saveCache({
|
||||
query: query.value,
|
||||
results: results.value,
|
||||
trustScores: update.trust_scores ?? {},
|
||||
sellers: update.sellers ?? {},
|
||||
marketPrice: marketPrice.value,
|
||||
adapterUsed: adapterUsed.value,
|
||||
})
|
||||
// Scrape complete — turn off the initial loading spinner.
|
||||
// enriching stays true while enrichment SSE is still open.
|
||||
loading.value = false
|
||||
} else if (update.type === 'market_price') {
|
||||
if (update.market_price != null) {
|
||||
marketPrice.value = update.market_price
|
||||
}
|
||||
} else if (update.type === 'update') {
|
||||
// Per-seller enrichment update (same as legacy format but typed)
|
||||
if (update.platform_listing_id && update.trust_score) {
|
||||
trustScores.value = new Map(trustScores.value)
|
||||
trustScores.value.set(update.platform_listing_id, update.trust_score)
|
||||
}
|
||||
if (update.seller?.platform_seller_id) {
|
||||
sellers.value = new Map(sellers.value)
|
||||
sellers.value.set(update.seller.platform_seller_id, update.seller)
|
||||
}
|
||||
if (update.market_price != null) {
|
||||
marketPrice.value = update.market_price
|
||||
}
|
||||
}
|
||||
// type: "error" — no special handling; stream will close via 'done'
|
||||
} else {
|
||||
// Legacy enrichment update (no type field) from synchronous search path
|
||||
const legacy = update as _LegacyUpdateEvent
|
||||
if (legacy.platform_listing_id && legacy.trust_score) {
|
||||
trustScores.value = new Map(trustScores.value)
|
||||
trustScores.value.set(legacy.platform_listing_id, legacy.trust_score)
|
||||
}
|
||||
if (legacy.seller) {
|
||||
const s = legacy.seller as Seller
|
||||
if (s.platform_seller_id) {
|
||||
sellers.value = new Map(sellers.value)
|
||||
sellers.value.set(s.platform_seller_id, s)
|
||||
}
|
||||
}
|
||||
if (legacy.market_price != null) {
|
||||
marketPrice.value = legacy.market_price
|
||||
}
|
||||
}
|
||||
if (update.market_price != null) {
|
||||
marketPrice.value = update.market_price
|
||||
}
|
||||
} catch {
|
||||
// malformed event — ignore
|
||||
|
|
@ -268,6 +333,8 @@ export const useSearchStore = defineStore('search', () => {
|
|||
})
|
||||
|
||||
es.onerror = () => {
|
||||
// If loading is still true (never got a "listings" event), clear it
|
||||
loading.value = false
|
||||
closeUpdates()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue