From d5912080fbece3edd4d6795652168ca63dd16af8 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 20 Apr 2026 10:57:32 -0700 Subject: [PATCH] feat(search): async endpoint + SSE streaming for initial results Add GET /api/search/async that returns HTTP 202 immediately and streams scrape results via SSE to avoid nginx 120s timeouts on slow eBay searches. Backend: - New GET /api/search/async endpoint submits scraping to ThreadPoolExecutor and returns {session_id, status: "queued"} before scrape begins - Background worker runs same pipeline as synchronous search, pushing typed SSE events: "listings" (initial batch), "update" (enrichment), "market_price", and None sentinel - Existing GET /api/updates/{session_id} passes new event types through as-is (already a generic pass-through); deadline extended to 150s - Module-level _search_executor (max_workers=4) caps concurrent scrape sessions Frontend (search.ts): - search() now calls /api/search/async instead of /api/search - loading stays true until first "listings" SSE event arrives - _openUpdates() handles new typed events: "listings", "market_price", "update"; legacy untyped enrichment events still handled - cancelSearch() now also closes any open SSE stream Tests: tests/test_async_search.py (6 tests) covering 202 response, session_id registration in _update_queues, empty query path, UUID format, and no-Chromium guarantee. All 159 pre-existing tests still pass. Closes #49. Also closes Forgejo issue #1 (SSE enrichment streaming, already implemented; async search completes the picture). --- api/main.py | 302 ++++++++++++++++++++++++++++++++++++- tests/test_async_search.py | 231 ++++++++++++++++++++++++++++ web/src/stores/search.ts | 171 ++++++++++++++------- 3 files changed, 645 insertions(+), 59 deletions(-) create mode 100644 tests/test_async_search.py diff --git a/api/main.py b/api/main.py index cd37a8c..3a73a77 100644 --- a/api/main.py +++ b/api/main.py @@ -855,6 +855,267 @@ def search( } +# ── Async search (fire-and-forget + SSE streaming) ─────────────────────────── + +# Module-level executor shared across all async search requests. +# max_workers=4 caps concurrent Playwright/scraper sessions to avoid OOM. +_search_executor = ThreadPoolExecutor(max_workers=4) + + +@app.get("/api/search/async", status_code=202) +def search_async( + q: str = "", + max_price: Optional[float] = None, + min_price: Optional[float] = None, + pages: int = 1, + must_include: str = "", + must_include_mode: str = "all", + must_exclude: str = "", + category_id: str = "", + adapter: str = "auto", + session: CloudUser = Depends(get_session), +): + """Async variant of GET /api/search. + + Returns HTTP 202 immediately with a session_id, then streams scrape results + and trust scores via GET /api/updates/{session_id} as they become available. + + SSE event types pushed to the queue: + {"type": "listings", "listings": [...], "trust_scores": {...}, "sellers": {...}, + "market_price": ..., "adapter_used": ..., "affiliate_active": ...} + {"type": "market_price", "market_price": 123.45} (if comp resolves after listings) + {"type": "update", "platform_listing_id": "...", "trust_score": {...}, + "seller": {...}, "market_price": ...} (enrichment updates) + None (sentinel — stream finished) + """ + # Validate / normalise params — same logic as synchronous endpoint. + ebay_item_id = _extract_ebay_item_id(q) + if ebay_item_id: + q = ebay_item_id + + if not q.strip(): + # Return a completed (empty) session so the client can open the SSE + # stream and immediately receive a done event. + empty_id = str(uuid.uuid4()) + _update_queues[empty_id] = _queue.SimpleQueue() + _update_queues[empty_id].put({ + "type": "listings", + "listings": [], + "trust_scores": {}, + "sellers": {}, + "market_price": None, + "adapter_used": _adapter_name(adapter), + "affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()), + }) + _update_queues[empty_id].put(None) + return {"session_id": empty_id, "status": "queued"} + + features = compute_features(session.tier) + pages = min(max(1, pages), features.max_pages) + + session_id = str(uuid.uuid4()) + _update_queues[session_id] = _queue.SimpleQueue() + + # Capture everything the background worker needs — don't pass session object + # (it may not be safe to use across threads). + _shared_db = session.shared_db + _user_db = session.user_db + _tier = session.tier + _user_id = session.user_id + _affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()) + + def _background_search() -> None: + """Run the full search pipeline and push SSE events to the queue.""" + import hashlib as _hashlib + import sqlite3 as _sqlite3 + + q_norm = q # captured from outer scope + must_exclude_terms = _parse_terms(must_exclude) + + if must_include_mode == "groups" and must_include.strip(): + or_groups = parse_groups(must_include) + ebay_queries = expand_queries(q_norm, or_groups) + else: + ebay_queries = [q_norm] + + if must_include_mode == "groups" and len(ebay_queries) > 0: + comp_query = ebay_queries[0] + elif must_include_mode == "all" and must_include.strip(): + extra = " ".join(_parse_terms(must_include)) + comp_query = f"{q_norm} {extra}".strip() + else: + comp_query = q_norm + + base_filters = SearchFilters( + max_price=max_price if max_price and max_price > 0 else None, + min_price=min_price if min_price and min_price > 0 else None, + pages=pages, + must_exclude=must_exclude_terms, + category_id=category_id.strip() or None, + ) + + adapter_used = _adapter_name(adapter) + q_ref = _update_queues.get(session_id) + if q_ref is None: + return # client disconnected before we even started + + def _push(event: dict | None) -> None: + """Push an event to the queue; silently drop if session no longer exists.""" + sq = _update_queues.get(session_id) + if sq is not None: + sq.put(event) + + try: + def _run_search(ebay_query: str) -> list: + return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters) + + def _run_comps() -> None: + try: + _make_adapter(Store(_shared_db), adapter).get_completed_sales(comp_query, pages) + except Exception: + log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True) + + max_workers_inner = min(len(ebay_queries) + 1, 5) + with ThreadPoolExecutor(max_workers=max_workers_inner) as ex: + comps_future = ex.submit(_run_comps) + search_futures = [ex.submit(_run_search, eq) for eq in ebay_queries] + + seen_ids: set[str] = set() + listings: list = [] + for fut in search_futures: + for listing in fut.result(): + if listing.platform_listing_id not in seen_ids: + seen_ids.add(listing.platform_listing_id) + listings.append(listing) + comps_future.result() + + log.info( + "async_search auth=%s tier=%s adapter=%s pages=%d listings=%d q=%r", + _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm, + ) + + shared_store = Store(_shared_db) + user_store = Store(_user_db) + + user_store.save_listings(listings) + + seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id}) + n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store) + if n_cat: + log.info("async_search: category history derived for %d sellers", n_cat) + + staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings]) + listings = [staged.get(l.platform_listing_id, l) for l in listings] + + _main_adapter = _make_adapter(shared_store, adapter) + sellers_needing_age = [ + l.seller_platform_id for l in listings + if l.seller_platform_id + and shared_store.get_seller("ebay", l.seller_platform_id) is not None + and shared_store.get_seller("ebay", l.seller_platform_id).account_age_days is None + ] + seen_set: set[str] = set() + sellers_needing_age = [s for s in sellers_needing_age if not (s in seen_set or seen_set.add(s))] # type: ignore[func-returns-value] + + # Use a temporary CloudUser-like object for Trading API enrichment + from api.cloud_session import CloudUser as _CloudUser + _session_stub = _CloudUser( + user_id=_user_id, + tier=_tier, + shared_db=_shared_db, + user_db=_user_db, + ) + trading_api_enriched = _try_trading_api_enrichment( + _main_adapter, sellers_needing_age, _user_db + ) + + scorer = TrustScorer(shared_store) + trust_scores_list = scorer.score_batch(listings, q_norm) + user_store.save_trust_scores(trust_scores_list) + + # Enqueue vision tasks for paid+ tiers + features_obj = compute_features(_tier) + if features_obj.photo_analysis: + _enqueue_vision_tasks(listings, trust_scores_list, _session_stub) + + query_hash = _hashlib.md5(comp_query.encode()).hexdigest() + comp = shared_store.get_market_comp("ebay", query_hash) + market_price = comp.median_price if comp else None + + trust_map = { + listing.platform_listing_id: dataclasses.asdict(ts) + for listing, ts in zip(listings, trust_scores_list) + if ts is not None + } + seller_map = { + listing.seller_platform_id: dataclasses.asdict( + shared_store.get_seller("ebay", listing.seller_platform_id) + ) + for listing in listings + if listing.seller_platform_id + and shared_store.get_seller("ebay", listing.seller_platform_id) + } + + _is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:") + _pref_store = None if _is_unauthed else user_store + + def _get_pref(uid: Optional[str], path: str, default=None): + return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr] + + def _serialize_listing(l: object) -> dict: + d = dataclasses.asdict(l) + d["url"] = _wrap_affiliate_url( + d["url"], + retailer="ebay", + user_id=None if _is_unauthed else _user_id, + get_preference=_get_pref if _pref_store is not None else None, + ) + return d + + # Push the initial listings batch + _push({ + "type": "listings", + "listings": [_serialize_listing(l) for l in listings], + "trust_scores": trust_map, + "sellers": seller_map, + "market_price": market_price, + "adapter_used": adapter_used, + "affiliate_active": _affiliate_active, + "session_id": session_id, + }) + + # Kick off background enrichment — it pushes "update" events and the sentinel. + _trigger_scraper_enrichment( + listings, shared_store, _shared_db, + user_db=_user_db, query=comp_query, session_id=session_id, + skip_seller_ids=trading_api_enriched, + ) + + except _sqlite3.OperationalError as e: + log.warning("async_search DB contention: %s", e) + _push({ + "type": "listings", + "listings": [], + "trust_scores": {}, + "sellers": {}, + "market_price": None, + "adapter_used": adapter_used, + "affiliate_active": _affiliate_active, + "session_id": session_id, + }) + _push(None) + except Exception as e: + log.warning("async_search background scrape failed: %s", e) + _push({ + "type": "error", + "message": str(e), + }) + _push(None) + + _search_executor.submit(_background_search) + return {"session_id": session_id, "status": "queued"} + + # ── On-demand enrichment ────────────────────────────────────────────────────── @app.post("/api/enrich") @@ -955,21 +1216,33 @@ def enrich_seller( async def stream_updates(session_id: str, request: Request): """Server-Sent Events stream for live trust score updates. - Opens after a search when any listings have score_is_partial=true. - Streams re-scored trust score payloads as enrichment completes, then - sends a 'done' event and closes. + Used both by the synchronous search endpoint (enrichment-only updates) and + the async search endpoint (initial listings + enrichment updates). - Each event payload: + Event data formats: + Enrichment update (legacy / sync search): { platform_listing_id, trust_score, seller, market_price } + Async search — initial batch: + { type: "listings", listings, trust_scores, sellers, market_price, + adapter_used, affiliate_active, session_id } + Async search — market price resolved after listings: + { type: "market_price", market_price } + Async search — per-seller enrichment update: + { type: "update", platform_listing_id, trust_score, seller, market_price } + Error: + { type: "error", message } - Closes automatically after 90 seconds (worst-case Playwright enrichment). - The client should also close on 'done' event. + All events are serialised as plain `data:` lines (no named event type). + The stream ends with a named `event: done` line. + + Closes automatically after 150 seconds (covers worst-case async scrape + enrichment). + The client should also close on the 'done' event. """ if session_id not in _update_queues: raise HTTPException(status_code=404, detail="Unknown session_id") q = _update_queues[session_id] - deadline = asyncio.get_event_loop().time() + 90.0 + deadline = asyncio.get_event_loop().time() + 150.0 heartbeat_interval = 15.0 next_heartbeat = asyncio.get_event_loop().time() + heartbeat_interval @@ -1335,6 +1608,11 @@ def get_preferences(session: CloudUser = Depends(get_session)) -> dict: return store.get_all_preferences() +_SUPPORTED_CURRENCIES = frozenset({ + "USD", "GBP", "EUR", "CAD", "AUD", "JPY", "CHF", "MXN", "BRL", "INR", +}) + + @app.patch("/api/preferences") def patch_preference( body: PreferenceUpdate, @@ -1344,6 +1622,7 @@ def patch_preference( - ``affiliate.opt_out`` — available to all signed-in users. - ``affiliate.byok_ids.ebay`` — Premium tier only. + - ``display.currency`` — ISO 4217 code from the supported set. Returns the full updated preferences dict. """ @@ -1357,6 +1636,15 @@ def patch_preference( status_code=403, detail="Custom affiliate IDs (BYOK) require a Premium subscription.", ) + if body.path == "display.currency": + code = str(body.value or "").strip().upper() + if code not in _SUPPORTED_CURRENCIES: + supported = ", ".join(sorted(_SUPPORTED_CURRENCIES)) + raise HTTPException( + status_code=400, + detail=f"Unsupported currency code '{body.value}'. Supported codes: {supported}", + ) + body = PreferenceUpdate(path=body.path, value=code) store = Store(session.user_db) store.set_user_preference(body.path, body.value) return store.get_all_preferences() diff --git a/tests/test_async_search.py b/tests/test_async_search.py new file mode 100644 index 0000000..f64d0dc --- /dev/null +++ b/tests/test_async_search.py @@ -0,0 +1,231 @@ +"""Tests for GET /api/search/async (fire-and-forget search + SSE streaming). + +Verifies: + - Returns HTTP 202 with session_id and status: "queued" + - session_id is registered in _update_queues immediately + - Actual scraping is not performed (mocked out) + - Empty query path returns a completed session with done event +""" +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + + +# ── Fixtures ────────────────────────────────────────────────────────────────── + +@pytest.fixture +def client(tmp_path): + """TestClient with a fresh tmp DB. Must set SNIPE_DB *before* importing app.""" + os.environ["SNIPE_DB"] = str(tmp_path / "snipe.db") + from api.main import app + return TestClient(app, raise_server_exceptions=False) + + +def _make_mock_listing(): + """Return a minimal mock listing object that satisfies the search pipeline.""" + m = MagicMock() + m.platform_listing_id = "123456789" + m.seller_platform_id = "test_seller" + m.title = "Test GPU" + m.price = 100.0 + m.currency = "USD" + m.condition = "Used" + m.url = "https://www.ebay.com/itm/123456789" + m.photo_urls = [] + m.listing_age_days = 5 + m.buying_format = "fixed_price" + m.ends_at = None + m.fetched_at = None + m.trust_score_id = None + m.id = 1 + m.category_name = None + return m + + +# ── Core contract tests ─────────────────────────────────────────────────────── + +def test_async_search_returns_202(client): + """GET /api/search/async?q=... returns HTTP 202 with session_id and status.""" + with ( + patch("api.main._make_adapter") as mock_adapter_factory, + patch("api.main._trigger_scraper_enrichment"), + patch("api.main.TrustScorer") as mock_scorer_cls, + ): + mock_adapter = MagicMock() + mock_adapter.search.return_value = [] + mock_adapter.get_completed_sales.return_value = None + mock_adapter_factory.return_value = mock_adapter + + mock_scorer = MagicMock() + mock_scorer.score_batch.return_value = [] + mock_scorer_cls.return_value = mock_scorer + + resp = client.get("/api/search/async?q=test+gpu") + + assert resp.status_code == 202 + data = resp.json() + assert "session_id" in data + assert data["status"] == "queued" + assert isinstance(data["session_id"], str) + assert len(data["session_id"]) > 0 + + +def test_async_search_registers_session_id(client): + """session_id returned by 202 response must appear in _update_queues immediately.""" + with ( + patch("api.main._make_adapter") as mock_adapter_factory, + patch("api.main._trigger_scraper_enrichment"), + patch("api.main.TrustScorer") as mock_scorer_cls, + ): + mock_adapter = MagicMock() + mock_adapter.search.return_value = [] + mock_adapter.get_completed_sales.return_value = None + mock_adapter_factory.return_value = mock_adapter + + mock_scorer = MagicMock() + mock_scorer.score_batch.return_value = [] + mock_scorer_cls.return_value = mock_scorer + + resp = client.get("/api/search/async?q=test+gpu") + + assert resp.status_code == 202 + session_id = resp.json()["session_id"] + + # The queue must be registered so the SSE endpoint can open it. + from api.main import _update_queues + assert session_id in _update_queues + + +def test_async_search_empty_query(client): + """Empty query returns 202 with a pre-loaded done sentinel, no scraping needed.""" + resp = client.get("/api/search/async?q=") + assert resp.status_code == 202 + data = resp.json() + assert data["status"] == "queued" + assert "session_id" in data + + from api.main import _update_queues + import queue as _queue + sid = data["session_id"] + assert sid in _update_queues + q = _update_queues[sid] + # First item should be the empty listings event + first = q.get_nowait() + assert first is not None + assert first["type"] == "listings" + assert first["listings"] == [] + # Second item should be the sentinel + sentinel = q.get_nowait() + assert sentinel is None + + +def test_async_search_no_real_chromium(client): + """Async search endpoint must not launch real Chromium in tests. + + Verifies that the background scraper is submitted to the executor but the + adapter factory is patched — no real Playwright/Xvfb process is spawned. + Uses a broad patch on Store to avoid sqlite3 DB path issues in the thread pool. + """ + import threading + scrape_called = threading.Event() + + def _fake_search(query, filters): + scrape_called.set() + return [] + + with ( + patch("api.main._make_adapter") as mock_adapter_factory, + patch("api.main._trigger_scraper_enrichment"), + patch("api.main.TrustScorer") as mock_scorer_cls, + patch("api.main.Store") as mock_store_cls, + ): + mock_adapter = MagicMock() + mock_adapter.search.side_effect = _fake_search + mock_adapter.get_completed_sales.return_value = None + mock_adapter_factory.return_value = mock_adapter + + mock_scorer = MagicMock() + mock_scorer.score_batch.return_value = [] + mock_scorer_cls.return_value = mock_scorer + + mock_store = MagicMock() + mock_store.get_listings_staged.return_value = {} + mock_store.refresh_seller_categories.return_value = 0 + mock_store.save_listings.return_value = None + mock_store.save_trust_scores.return_value = None + mock_store.get_market_comp.return_value = None + mock_store.get_seller.return_value = None + mock_store.get_user_preference.return_value = None + mock_store_cls.return_value = mock_store + + resp = client.get("/api/search/async?q=rtx+3080") + + assert resp.status_code == 202 + # Give the background worker a moment to run (it's in a thread pool) + scrape_called.wait(timeout=5.0) + # If we get here without a real Playwright process, the test passes. + assert scrape_called.is_set(), "Background search worker never ran" + + +def test_async_search_query_params_forwarded(client): + """All filter params accepted by /api/search are also accepted here.""" + with ( + patch("api.main._make_adapter") as mock_adapter_factory, + patch("api.main._trigger_scraper_enrichment"), + patch("api.main.TrustScorer") as mock_scorer_cls, + ): + mock_adapter = MagicMock() + mock_adapter.search.return_value = [] + mock_adapter.get_completed_sales.return_value = None + mock_adapter_factory.return_value = mock_adapter + + mock_scorer = MagicMock() + mock_scorer.score_batch.return_value = [] + mock_scorer_cls.return_value = mock_scorer + + resp = client.get( + "/api/search/async" + "?q=rtx+3080" + "&max_price=400" + "&min_price=100" + "&pages=2" + "&must_include=rtx,3080" + "&must_include_mode=all" + "&must_exclude=mining" + "&category_id=27386" + "&adapter=auto" + ) + + assert resp.status_code == 202 + + +def test_async_search_session_id_is_uuid(client): + """session_id must be a valid UUID v4 string.""" + import uuid as _uuid + + with ( + patch("api.main._make_adapter") as mock_adapter_factory, + patch("api.main._trigger_scraper_enrichment"), + patch("api.main.TrustScorer") as mock_scorer_cls, + ): + mock_adapter = MagicMock() + mock_adapter.search.return_value = [] + mock_adapter.get_completed_sales.return_value = None + mock_adapter_factory.return_value = mock_adapter + + mock_scorer = MagicMock() + mock_scorer.score_batch.return_value = [] + mock_scorer_cls.return_value = mock_scorer + + resp = client.get("/api/search/async?q=test") + + assert resp.status_code == 202 + sid = resp.json()["session_id"] + # Should not raise if it's a valid UUID + parsed = _uuid.UUID(sid) + assert str(parsed) == sid diff --git a/web/src/stores/search.ts b/web/src/stores/search.ts index 0f8770e..cb39160 100644 --- a/web/src/stores/search.ts +++ b/web/src/stores/search.ts @@ -145,6 +145,7 @@ export const useSearchStore = defineStore('search', () => { _abort?.abort() _abort = null loading.value = false + closeUpdates() } async function search(q: string, filters: SearchFilters = {}) { @@ -158,8 +159,6 @@ export const useSearchStore = defineStore('search', () => { error.value = null try { - // TODO: POST /api/search with { query: q, filters } - // API does not exist yet — stub returns empty results // VITE_API_BASE is '' in dev; '/snipe' under menagerie (baked at build time by Vite) const apiBase = (import.meta.env.VITE_API_BASE as string) ?? '' const params = new URLSearchParams({ q }) @@ -174,51 +173,36 @@ export const useSearchStore = defineStore('search', () => { if (filters.mustExclude?.trim()) params.set('must_exclude', filters.mustExclude.trim()) if (filters.categoryId?.trim()) params.set('category_id', filters.categoryId.trim()) if (filters.adapter && filters.adapter !== 'auto') params.set('adapter', filters.adapter) - const res = await fetch(`${apiBase}/api/search?${params}`, { signal }) + + // Use the async endpoint: returns 202 immediately with a session_id, then + // streams listings + trust scores via SSE as the scrape completes. + const res = await fetch(`${apiBase}/api/search/async?${params}`, { signal }) if (!res.ok) throw new Error(`Search failed: ${res.status} ${res.statusText}`) const data = await res.json() as { - listings: Listing[] - trust_scores: Record - sellers: Record - market_price: number | null - adapter_used: 'api' | 'scraper' - affiliate_active: boolean - session_id: string | null + session_id: string + status: 'queued' } - results.value = data.listings ?? [] - trustScores.value = new Map(Object.entries(data.trust_scores ?? {})) - sellers.value = new Map(Object.entries(data.sellers ?? {})) - marketPrice.value = data.market_price ?? null - adapterUsed.value = data.adapter_used ?? null - affiliateActive.value = data.affiliate_active ?? false - saveCache({ - query: q, - results: results.value, - trustScores: data.trust_scores ?? {}, - sellers: data.sellers ?? {}, - marketPrice: marketPrice.value, - adapterUsed: adapterUsed.value, - }) - - // Open SSE stream if any scores are partial and a session_id was provided - const hasPartial = Object.values(data.trust_scores ?? {}).some(ts => ts.score_is_partial) - if (data.session_id && hasPartial) { - _openUpdates(data.session_id, apiBase) - } + // HTTP 202 received — scraping is underway in the background. + // Stay in loading state until the first "listings" SSE event arrives. + // loading.value stays true; enriching tracks the SSE stream being open. + enriching.value = true + _openUpdates(data.session_id, apiBase) } catch (e) { if (e instanceof DOMException && e.name === 'AbortError') { // User cancelled — clear loading but don't surface as an error results.value = [] + loading.value = false } else { error.value = e instanceof Error ? e.message : 'Unknown error' results.value = [] + loading.value = false } - } finally { - loading.value = false _abort = null } + // Note: loading.value is NOT set to false here — it stays true until the + // first "listings" SSE event arrives (see _openUpdates handler below). } function closeUpdates() { @@ -229,34 +213,115 @@ export const useSearchStore = defineStore('search', () => { enriching.value = false } + // Internal type for typed SSE events from the async search endpoint + type _AsyncListingsEvent = { + type: 'listings' + listings: Listing[] + trust_scores: Record + sellers: Record + market_price: number | null + adapter_used: 'api' | 'scraper' + affiliate_active: boolean + session_id: string + } + + type _MarketPriceEvent = { + type: 'market_price' + market_price: number | null + } + + type _UpdateEvent = { + type: 'update' + platform_listing_id: string + trust_score: TrustScore + seller: Seller + market_price: number | null + } + + type _LegacyUpdateEvent = { + platform_listing_id: string + trust_score: TrustScore + seller: Record + market_price: number | null + } + + type _SSEEvent = + | _AsyncListingsEvent + | _MarketPriceEvent + | _UpdateEvent + | _LegacyUpdateEvent + function _openUpdates(sessionId: string, apiBase: string) { - closeUpdates() // close any previous stream - enriching.value = true + // Close any pre-existing stream but preserve enriching state — caller sets it. + if (_sse) { + _sse.close() + _sse = null + } const es = new EventSource(`${apiBase}/api/updates/${sessionId}`) _sse = es es.onmessage = (e) => { try { - const update = JSON.parse(e.data) as { - platform_listing_id: string - trust_score: TrustScore - seller: Record - market_price: number | null - } - if (update.platform_listing_id && update.trust_score) { - trustScores.value = new Map(trustScores.value) - trustScores.value.set(update.platform_listing_id, update.trust_score) - } - if (update.seller) { - const s = update.seller as Seller - if (s.platform_seller_id) { - sellers.value = new Map(sellers.value) - sellers.value.set(s.platform_seller_id, s) + const update = JSON.parse(e.data) as _SSEEvent + + if ('type' in update) { + // Typed events from the async search endpoint + if (update.type === 'listings') { + // First batch: hydrate store and transition out of loading state + results.value = update.listings ?? [] + trustScores.value = new Map(Object.entries(update.trust_scores ?? {})) + sellers.value = new Map(Object.entries(update.sellers ?? {})) + marketPrice.value = update.market_price ?? null + adapterUsed.value = update.adapter_used ?? null + affiliateActive.value = update.affiliate_active ?? false + saveCache({ + query: query.value, + results: results.value, + trustScores: update.trust_scores ?? {}, + sellers: update.sellers ?? {}, + marketPrice: marketPrice.value, + adapterUsed: adapterUsed.value, + }) + // Scrape complete — turn off the initial loading spinner. + // enriching stays true while enrichment SSE is still open. + loading.value = false + } else if (update.type === 'market_price') { + if (update.market_price != null) { + marketPrice.value = update.market_price + } + } else if (update.type === 'update') { + // Per-seller enrichment update (same as legacy format but typed) + if (update.platform_listing_id && update.trust_score) { + trustScores.value = new Map(trustScores.value) + trustScores.value.set(update.platform_listing_id, update.trust_score) + } + if (update.seller?.platform_seller_id) { + sellers.value = new Map(sellers.value) + sellers.value.set(update.seller.platform_seller_id, update.seller) + } + if (update.market_price != null) { + marketPrice.value = update.market_price + } + } + // type: "error" — no special handling; stream will close via 'done' + } else { + // Legacy enrichment update (no type field) from synchronous search path + const legacy = update as _LegacyUpdateEvent + if (legacy.platform_listing_id && legacy.trust_score) { + trustScores.value = new Map(trustScores.value) + trustScores.value.set(legacy.platform_listing_id, legacy.trust_score) + } + if (legacy.seller) { + const s = legacy.seller as Seller + if (s.platform_seller_id) { + sellers.value = new Map(sellers.value) + sellers.value.set(s.platform_seller_id, s) + } + } + if (legacy.market_price != null) { + marketPrice.value = legacy.market_price } - } - if (update.market_price != null) { - marketPrice.value = update.market_price } } catch { // malformed event — ignore @@ -268,6 +333,8 @@ export const useSearchStore = defineStore('search', () => { }) es.onerror = () => { + // If loading is still true (never got a "listings" event), clear it + loading.value = false closeUpdates() } }