From 303b4bfb6f8a5b814a113e8990c91813adf5db70 Mon Sep 17 00:00:00 2001
From: pyr0ball
Date: Sun, 5 Apr 2026 23:12:27 -0700
Subject: [PATCH] feat: SSE live score push for background enrichment (#1)
After a search, the API now returns a session_id. If any trust scores are
partial (pending seller age or category data), the frontend opens a
Server-Sent Events stream to /api/updates/{session_id}. As the background
BTF (account age) and category enrichment threads complete, they re-score
affected listings and push updated TrustScore payloads over SSE. The
frontend patches the trustScores and sellers maps reactively so signal
dots light up without requiring a manual re-search.
Backend:
- _update_queues registry maps session_id -> SimpleQueue (thread-safe bridge)
- _trigger_scraper_enrichment accepts session_id/user_db/query, builds a
seller->listings map, calls _push_updates() after each enrichment pass
which re-scores, saves trust scores, and puts events on the queue
- New GET /api/updates/{session_id} SSE endpoint: polls queue every 500ms,
emits heartbeats every 15s, closes on sentinel None or 90s timeout
- search endpoint generates session_id and returns it in response
Frontend:
- search store adds enriching state and _openUpdates() / closeUpdates()
- On search completion, if partial scores exist, opens EventSource stream
- onmessage: patches trustScores and sellers maps (new Map() to trigger
Vue reactivity), updates marketPrice if included
- on 'done' event or error: closes stream, enriching = false
- SearchView: pulsing 'Updating scores...' badge in toolbar while enriching
---
api/main.py | 156 +++++++++++++++++++++++++++++++----
web/src/stores/search.ts | 62 ++++++++++++++
web/src/views/SearchView.vue | 32 +++++++
3 files changed, 236 insertions(+), 14 deletions(-)
diff --git a/api/main.py b/api/main.py
index d4ae265..e5fd187 100644
--- a/api/main.py
+++ b/api/main.py
@@ -3,16 +3,20 @@ from __future__ import annotations
import dataclasses
import hashlib
+import json as _json
import logging
import os
+import queue as _queue
+import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from pathlib import Path
+import asyncio
import csv
import io
-from fastapi import Depends, FastAPI, HTTPException, UploadFile, File
+from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, File
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
@@ -34,6 +38,12 @@ from api.ebay_webhook import router as ebay_webhook_router
load_env(Path(".env"))
log = logging.getLogger(__name__)
+# ── SSE update registry ───────────────────────────────────────────────────────
+# Maps session_id → SimpleQueue of update events.
+# SimpleQueue is always thread-safe; no asyncio loop needed to write from threads.
+# Keys are cleaned up when the SSE stream ends (client disconnect or timeout).
+_update_queues: dict[str, _queue.SimpleQueue] = {}
+
@asynccontextmanager
async def _lifespan(app: FastAPI):
@@ -109,31 +119,34 @@ def _trigger_scraper_enrichment(
listings: list,
shared_store: Store,
shared_db: Path,
+ user_db: Path | None = None,
+ query: str = "",
+ session_id: str | None = None,
) -> None:
"""Fire-and-forget background enrichment for missing seller signals.
- Two enrichment passes run concurrently in the same daemon thread:
+ Two enrichment passes run in the same daemon thread:
1. BTF (/itm/ pages) — fills account_age_days for sellers where it is None.
2. _ssn search pages — fills category_history_json for sellers with no history.
- The main response returns immediately; enriched data lands in the DB for
- future searches. Uses ScrapedEbayAdapter's Playwright stack regardless of
- which adapter was used for the main search (Shopping API handles age for
- the API adapter inline; BTF is the fallback for no-creds / scraper mode).
+ When session_id is provided, pushes re-scored trust score updates to the
+ SSE queue after each pass so the frontend can update scores live.
shared_store: used for pre-flight seller checks (same-thread reads).
- shared_db: path passed to background thread — it creates its own Store
- (sqlite3 connections are not thread-safe).
+ shared_db: path passed to background thread (sqlite3 is not thread-safe).
+ user_db: path to per-user listings/trust_scores DB (same as shared_db in local mode).
+ query: original search query — used for market comp lookup during re-score.
+ session_id: SSE session key; if set, updates are pushed to _update_queues[session_id].
"""
- # Caps per search: limits Playwright sessions launched in the background so we
- # don't hammer Kasada or spin up dozens of Xvfb instances after a large search.
- # Remaining sellers get enriched incrementally on subsequent searches.
_BTF_MAX_PER_SEARCH = 3
_CAT_MAX_PER_SEARCH = 3
needs_btf: dict[str, str] = {}
needs_categories: list[str] = []
+ # Map seller_id → [listings] for this search so we know what to re-score
+ seller_listing_map: dict[str, list] = {}
+
for listing in listings:
sid = listing.seller_platform_id
if not sid:
@@ -141,6 +154,7 @@ def _trigger_scraper_enrichment(
seller = shared_store.get_seller("ebay", sid)
if not seller:
continue
+ seller_listing_map.setdefault(sid, []).append(listing)
if ((seller.account_age_days is None or seller.feedback_count == 0)
and sid not in needs_btf
and len(needs_btf) < _BTF_MAX_PER_SEARCH):
@@ -151,6 +165,8 @@ def _trigger_scraper_enrichment(
needs_categories.append(sid)
if not needs_btf and not needs_categories:
+ if session_id and session_id in _update_queues:
+ _update_queues[session_id].put(None) # sentinel — nothing to enrich
return
log.info(
@@ -158,17 +174,55 @@ def _trigger_scraper_enrichment(
len(needs_btf), len(needs_categories),
)
+ def _push_updates(enriched_seller_ids: list[str]) -> None:
+ """Re-score listings for enriched sellers and push updates to SSE queue."""
+ if not session_id or session_id not in _update_queues:
+ return
+ q = _update_queues[session_id]
+ thread_shared = Store(shared_db)
+ thread_user = Store(user_db or shared_db)
+ scorer = TrustScorer(thread_shared)
+ comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest())
+ market_price = comp.median_price if comp else None
+ for sid in enriched_seller_ids:
+ seller = thread_shared.get_seller("ebay", sid)
+ if not seller:
+ continue
+ affected = seller_listing_map.get(sid, [])
+ if not affected:
+ continue
+ new_scores = scorer.score_batch(affected, query)
+ thread_user.save_trust_scores(new_scores)
+ for listing, ts in zip(affected, new_scores):
+ if ts is None:
+ continue
+ q.put({
+ "platform_listing_id": listing.platform_listing_id,
+ "trust_score": dataclasses.asdict(ts),
+ "seller": dataclasses.asdict(seller),
+ "market_price": market_price,
+ })
+
def _run():
try:
enricher = ScrapedEbayAdapter(Store(shared_db))
if needs_btf:
enricher.enrich_sellers_btf(needs_btf, max_workers=2)
log.info("BTF enrichment complete for %d sellers", len(needs_btf))
+ _push_updates(list(needs_btf.keys()))
if needs_categories:
enricher.enrich_sellers_categories(needs_categories, max_workers=2)
log.info("Category enrichment complete for %d sellers", len(needs_categories))
+ # Re-score only sellers not already covered by BTF push
+ cat_only = [s for s in needs_categories if s not in needs_btf]
+ if cat_only:
+ _push_updates(cat_only)
except Exception as e:
log.warning("Scraper enrichment failed: %s", e)
+ finally:
+ # Sentinel: tells SSE stream the enrichment thread is done
+ if session_id and session_id in _update_queues:
+ _update_queues[session_id].put(None)
import threading
t = threading.Thread(target=_run, daemon=True)
@@ -365,9 +419,15 @@ def search(
listings = [staged.get(l.platform_listing_id, l) for l in listings]
# BTF enrichment: scrape /itm/ pages for sellers missing account_age_days.
- # Runs in the background so it doesn't delay the response; next search of
- # the same sellers will have full scores.
- _trigger_scraper_enrichment(listings, shared_store, shared_db)
+ # Runs in the background so it doesn't delay the response. A session_id is
+ # generated so the frontend can open an SSE stream and receive live score
+ # updates as enrichment completes.
+ session_id = str(uuid.uuid4())
+ _update_queues[session_id] = _queue.SimpleQueue()
+ _trigger_scraper_enrichment(
+ listings, shared_store, shared_db,
+ user_db=user_db, query=q, session_id=session_id,
+ )
scorer = TrustScorer(shared_store)
trust_scores_list = scorer.score_batch(listings, q)
@@ -411,6 +471,7 @@ def search(
"market_price": market_price,
"adapter_used": adapter_used,
"affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()),
+ "session_id": session_id,
}
@@ -508,6 +569,73 @@ def enrich_seller(
}
+# ── SSE live score updates ────────────────────────────────────────────────────
+
+@app.get("/api/updates/{session_id}")
+async def stream_updates(session_id: str, request: Request):
+ """Server-Sent Events stream for live trust score updates.
+
+ Opens after a search when any listings have score_is_partial=true.
+ Streams re-scored trust score payloads as enrichment completes, then
+ sends a 'done' event and closes.
+
+ Each event payload:
+ { platform_listing_id, trust_score, seller, market_price }
+
+ Closes automatically after 90 seconds (worst-case Playwright enrichment).
+ The client should also close on 'done' event.
+ """
+ if session_id not in _update_queues:
+ raise HTTPException(status_code=404, detail="Unknown session_id")
+
+ q = _update_queues[session_id]
+ deadline = asyncio.get_event_loop().time() + 90.0
+ heartbeat_interval = 15.0
+ next_heartbeat = asyncio.get_event_loop().time() + heartbeat_interval
+
+ async def generate():
+ nonlocal next_heartbeat
+ try:
+ while asyncio.get_event_loop().time() < deadline:
+ if await request.is_disconnected():
+ break
+
+ # Drain all available updates (non-blocking)
+ while True:
+ try:
+ item = q.get_nowait()
+ except _queue.Empty:
+ break
+ if item is None:
+ # Sentinel: enrichment thread is done
+ yield "event: done\ndata: {}\n\n"
+ return
+ yield f"data: {_json.dumps(item)}\n\n"
+
+ # Heartbeat to keep the connection alive through proxies
+ now = asyncio.get_event_loop().time()
+ if now >= next_heartbeat:
+ yield ": heartbeat\n\n"
+ next_heartbeat = now + heartbeat_interval
+
+ await asyncio.sleep(0.5)
+
+ # Timeout reached
+ yield "event: done\ndata: {\"reason\": \"timeout\"}\n\n"
+ finally:
+ _update_queues.pop(session_id, None)
+
+ return StreamingResponse(
+ generate(),
+ media_type="text/event-stream",
+ headers={
+ "Cache-Control": "no-cache",
+ "X-Accel-Buffering": "no", # nginx: disable proxy buffering for SSE
+ "Connection": "keep-alive",
+ },
+ )
+
+
# ── Saved Searches ────────────────────────────────────────────────────────────
class SavedSearchCreate(BaseModel):
diff --git a/web/src/stores/search.ts b/web/src/stores/search.ts
index 2269e76..3fca4f8 100644
--- a/web/src/stores/search.ts
+++ b/web/src/stores/search.ts
@@ -123,8 +123,10 @@ export const useSearchStore = defineStore('search', () => {
const affiliateActive = ref(false)
const loading = ref(false)
const error = ref(null)
+ const enriching = ref(false) // true while SSE stream is open
let _abort: AbortController | null = null
+ let _sse: EventSource | null = null
function cancelSearch() {
_abort?.abort()
@@ -166,6 +168,7 @@ export const useSearchStore = defineStore('search', () => {
market_price: number | null
adapter_used: 'api' | 'scraper'
affiliate_active: boolean
+ session_id: string | null
}
results.value = data.listings ?? []
@@ -182,6 +185,12 @@ export const useSearchStore = defineStore('search', () => {
marketPrice: marketPrice.value,
adapterUsed: adapterUsed.value,
})
+
+ // Open SSE stream if any scores are partial and a session_id was provided
+ const hasPartial = Object.values(data.trust_scores ?? {}).some(ts => ts.score_is_partial)
+ if (data.session_id && hasPartial) {
+ _openUpdates(data.session_id, apiBase)
+ }
} catch (e) {
if (e instanceof DOMException && e.name === 'AbortError') {
// User cancelled — clear loading but don't surface as an error
@@ -196,6 +205,57 @@ export const useSearchStore = defineStore('search', () => {
}
}
+ function closeUpdates() {
+ if (_sse) {
+ _sse.close()
+ _sse = null
+ }
+ enriching.value = false
+ }
+
+ function _openUpdates(sessionId: string, apiBase: string) {
+ closeUpdates() // close any previous stream
+ enriching.value = true
+
+ const es = new EventSource(`${apiBase}/api/updates/${sessionId}`)
+ _sse = es
+
+ es.onmessage = (e) => {
+ try {
+ const update = JSON.parse(e.data) as {
+ platform_listing_id: string
+ trust_score: TrustScore
+ seller: Record
+ market_price: number | null
+ }
+ if (update.platform_listing_id && update.trust_score) {
+ trustScores.value = new Map(trustScores.value)
+ trustScores.value.set(update.platform_listing_id, update.trust_score)
+ }
+ if (update.seller) {
+ const s = update.seller as Seller
+ if (s.platform_seller_id) {
+ sellers.value = new Map(sellers.value)
+ sellers.value.set(s.platform_seller_id, s)
+ }
+ }
+ if (update.market_price != null) {
+ marketPrice.value = update.market_price
+ }
+ } catch {
+ // malformed event — ignore
+ }
+ }
+
+ es.addEventListener('done', () => {
+ closeUpdates()
+ })
+
+ es.onerror = () => {
+ closeUpdates()
+ }
+ }
+
async function enrichSeller(sellerUsername: string, listingId: string): Promise {
const apiBase = (import.meta.env.VITE_API_BASE as string) ?? ''
const params = new URLSearchParams({
@@ -230,10 +290,12 @@ export const useSearchStore = defineStore('search', () => {
adapterUsed,
affiliateActive,
loading,
+ enriching,
error,
search,
cancelSearch,
enrichSeller,
+ closeUpdates,
clearResults,
}
})
diff --git a/web/src/views/SearchView.vue b/web/src/views/SearchView.vue
index 9557693..3ffc87e 100644
--- a/web/src/views/SearchView.vue
+++ b/web/src/views/SearchView.vue
@@ -312,6 +312,11 @@
+
+
+
+ Updating scores…
+