feat: SSE live score push for background enrichment (#1)

After a search, the API now returns a session_id. If any trust scores are
partial (pending seller age or category data), the frontend opens a
Server-Sent Events stream to /api/updates/{session_id}. As the background
BTF (account age) and category enrichment threads complete, they re-score
affected listings and push updated TrustScore payloads over SSE. The
frontend patches the trustScores and sellers maps reactively so signal
dots light up without requiring a manual re-search.

Backend:
- _update_queues registry maps session_id -> SimpleQueue (thread-safe bridge)
- _trigger_scraper_enrichment accepts session_id/user_db/query, builds a
  seller->listings map, calls _push_updates() after each enrichment pass
  which re-scores, saves trust scores, and puts events on the queue
- New GET /api/updates/{session_id} SSE endpoint: polls queue every 500ms,
  emits heartbeats every 15s, closes on sentinel None or 90s timeout
- search endpoint generates session_id and returns it in response

Frontend:
- search store adds enriching state and _openUpdates() / closeUpdates()
- On search completion, if partial scores exist, opens EventSource stream
- onmessage: patches trustScores and sellers maps (new Map() to trigger
  Vue reactivity), updates marketPrice if included
- on 'done' event or error: closes stream, enriching = false
- SearchView: pulsing 'Updating scores...' badge in toolbar while enriching
This commit is contained in:
pyr0ball 2026-04-05 23:12:27 -07:00
parent 45c758bb53
commit 303b4bfb6f
3 changed files with 236 additions and 14 deletions

View file

@ -3,16 +3,20 @@ from __future__ import annotations
import dataclasses
import hashlib
import json as _json
import logging
import os
import queue as _queue
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from pathlib import Path
import asyncio
import csv
import io
from fastapi import Depends, FastAPI, HTTPException, UploadFile, File
from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, File
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
@ -34,6 +38,12 @@ from api.ebay_webhook import router as ebay_webhook_router
load_env(Path(".env"))
log = logging.getLogger(__name__)
# ── SSE update registry ───────────────────────────────────────────────────────
# Maps session_id → SimpleQueue of update events.
# SimpleQueue is always thread-safe; no asyncio loop needed to write from threads.
# Keys are cleaned up when the SSE stream ends (client disconnect or timeout).
_update_queues: dict[str, _queue.SimpleQueue] = {}
@asynccontextmanager
async def _lifespan(app: FastAPI):
@ -109,31 +119,34 @@ def _trigger_scraper_enrichment(
listings: list,
shared_store: Store,
shared_db: Path,
user_db: Path | None = None,
query: str = "",
session_id: str | None = None,
) -> None:
"""Fire-and-forget background enrichment for missing seller signals.
Two enrichment passes run concurrently in the same daemon thread:
Two enrichment passes run in the same daemon thread:
1. BTF (/itm/ pages) fills account_age_days for sellers where it is None.
2. _ssn search pages fills category_history_json for sellers with no history.
The main response returns immediately; enriched data lands in the DB for
future searches. Uses ScrapedEbayAdapter's Playwright stack regardless of
which adapter was used for the main search (Shopping API handles age for
the API adapter inline; BTF is the fallback for no-creds / scraper mode).
When session_id is provided, pushes re-scored trust score updates to the
SSE queue after each pass so the frontend can update scores live.
shared_store: used for pre-flight seller checks (same-thread reads).
shared_db: path passed to background thread it creates its own Store
(sqlite3 connections are not thread-safe).
shared_db: path passed to background thread (sqlite3 is not thread-safe).
user_db: path to per-user listings/trust_scores DB (same as shared_db in local mode).
query: original search query used for market comp lookup during re-score.
session_id: SSE session key; if set, updates are pushed to _update_queues[session_id].
"""
# Caps per search: limits Playwright sessions launched in the background so we
# don't hammer Kasada or spin up dozens of Xvfb instances after a large search.
# Remaining sellers get enriched incrementally on subsequent searches.
_BTF_MAX_PER_SEARCH = 3
_CAT_MAX_PER_SEARCH = 3
needs_btf: dict[str, str] = {}
needs_categories: list[str] = []
# Map seller_id → [listings] for this search so we know what to re-score
seller_listing_map: dict[str, list] = {}
for listing in listings:
sid = listing.seller_platform_id
if not sid:
@ -141,6 +154,7 @@ def _trigger_scraper_enrichment(
seller = shared_store.get_seller("ebay", sid)
if not seller:
continue
seller_listing_map.setdefault(sid, []).append(listing)
if ((seller.account_age_days is None or seller.feedback_count == 0)
and sid not in needs_btf
and len(needs_btf) < _BTF_MAX_PER_SEARCH):
@ -151,6 +165,8 @@ def _trigger_scraper_enrichment(
needs_categories.append(sid)
if not needs_btf and not needs_categories:
if session_id and session_id in _update_queues:
_update_queues[session_id].put(None) # sentinel — nothing to enrich
return
log.info(
@ -158,17 +174,55 @@ def _trigger_scraper_enrichment(
len(needs_btf), len(needs_categories),
)
def _push_updates(enriched_seller_ids: list[str]) -> None:
"""Re-score listings for enriched sellers and push updates to SSE queue."""
if not session_id or session_id not in _update_queues:
return
q = _update_queues[session_id]
thread_shared = Store(shared_db)
thread_user = Store(user_db or shared_db)
scorer = TrustScorer(thread_shared)
comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest())
market_price = comp.median_price if comp else None
for sid in enriched_seller_ids:
seller = thread_shared.get_seller("ebay", sid)
if not seller:
continue
affected = seller_listing_map.get(sid, [])
if not affected:
continue
new_scores = scorer.score_batch(affected, query)
thread_user.save_trust_scores(new_scores)
for listing, ts in zip(affected, new_scores):
if ts is None:
continue
q.put({
"platform_listing_id": listing.platform_listing_id,
"trust_score": dataclasses.asdict(ts),
"seller": dataclasses.asdict(seller),
"market_price": market_price,
})
def _run():
try:
enricher = ScrapedEbayAdapter(Store(shared_db))
if needs_btf:
enricher.enrich_sellers_btf(needs_btf, max_workers=2)
log.info("BTF enrichment complete for %d sellers", len(needs_btf))
_push_updates(list(needs_btf.keys()))
if needs_categories:
enricher.enrich_sellers_categories(needs_categories, max_workers=2)
log.info("Category enrichment complete for %d sellers", len(needs_categories))
# Re-score only sellers not already covered by BTF push
cat_only = [s for s in needs_categories if s not in needs_btf]
if cat_only:
_push_updates(cat_only)
except Exception as e:
log.warning("Scraper enrichment failed: %s", e)
finally:
# Sentinel: tells SSE stream the enrichment thread is done
if session_id and session_id in _update_queues:
_update_queues[session_id].put(None)
import threading
t = threading.Thread(target=_run, daemon=True)
@ -365,9 +419,15 @@ def search(
listings = [staged.get(l.platform_listing_id, l) for l in listings]
# BTF enrichment: scrape /itm/ pages for sellers missing account_age_days.
# Runs in the background so it doesn't delay the response; next search of
# the same sellers will have full scores.
_trigger_scraper_enrichment(listings, shared_store, shared_db)
# Runs in the background so it doesn't delay the response. A session_id is
# generated so the frontend can open an SSE stream and receive live score
# updates as enrichment completes.
session_id = str(uuid.uuid4())
_update_queues[session_id] = _queue.SimpleQueue()
_trigger_scraper_enrichment(
listings, shared_store, shared_db,
user_db=user_db, query=q, session_id=session_id,
)
scorer = TrustScorer(shared_store)
trust_scores_list = scorer.score_batch(listings, q)
@ -411,6 +471,7 @@ def search(
"market_price": market_price,
"adapter_used": adapter_used,
"affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()),
"session_id": session_id,
}
@ -508,6 +569,73 @@ def enrich_seller(
}
# ── SSE live score updates ────────────────────────────────────────────────────
@app.get("/api/updates/{session_id}")
async def stream_updates(session_id: str, request: Request):
"""Server-Sent Events stream for live trust score updates.
Opens after a search when any listings have score_is_partial=true.
Streams re-scored trust score payloads as enrichment completes, then
sends a 'done' event and closes.
Each event payload:
{ platform_listing_id, trust_score, seller, market_price }
Closes automatically after 90 seconds (worst-case Playwright enrichment).
The client should also close on 'done' event.
"""
if session_id not in _update_queues:
raise HTTPException(status_code=404, detail="Unknown session_id")
q = _update_queues[session_id]
deadline = asyncio.get_event_loop().time() + 90.0
heartbeat_interval = 15.0
next_heartbeat = asyncio.get_event_loop().time() + heartbeat_interval
async def generate():
nonlocal next_heartbeat
try:
while asyncio.get_event_loop().time() < deadline:
if await request.is_disconnected():
break
# Drain all available updates (non-blocking)
while True:
try:
item = q.get_nowait()
except _queue.Empty:
break
if item is None:
# Sentinel: enrichment thread is done
yield "event: done\ndata: {}\n\n"
return
yield f"data: {_json.dumps(item)}\n\n"
# Heartbeat to keep the connection alive through proxies
now = asyncio.get_event_loop().time()
if now >= next_heartbeat:
yield ": heartbeat\n\n"
next_heartbeat = now + heartbeat_interval
await asyncio.sleep(0.5)
# Timeout reached
yield "event: done\ndata: {\"reason\": \"timeout\"}\n\n"
finally:
_update_queues.pop(session_id, None)
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # nginx: disable proxy buffering for SSE
"Connection": "keep-alive",
},
)
# ── Saved Searches ────────────────────────────────────────────────────────────
class SavedSearchCreate(BaseModel):

View file

@ -123,8 +123,10 @@ export const useSearchStore = defineStore('search', () => {
const affiliateActive = ref<boolean>(false)
const loading = ref(false)
const error = ref<string | null>(null)
const enriching = ref(false) // true while SSE stream is open
let _abort: AbortController | null = null
let _sse: EventSource | null = null
function cancelSearch() {
_abort?.abort()
@ -166,6 +168,7 @@ export const useSearchStore = defineStore('search', () => {
market_price: number | null
adapter_used: 'api' | 'scraper'
affiliate_active: boolean
session_id: string | null
}
results.value = data.listings ?? []
@ -182,6 +185,12 @@ export const useSearchStore = defineStore('search', () => {
marketPrice: marketPrice.value,
adapterUsed: adapterUsed.value,
})
// Open SSE stream if any scores are partial and a session_id was provided
const hasPartial = Object.values(data.trust_scores ?? {}).some(ts => ts.score_is_partial)
if (data.session_id && hasPartial) {
_openUpdates(data.session_id, apiBase)
}
} catch (e) {
if (e instanceof DOMException && e.name === 'AbortError') {
// User cancelled — clear loading but don't surface as an error
@ -196,6 +205,57 @@ export const useSearchStore = defineStore('search', () => {
}
}
function closeUpdates() {
if (_sse) {
_sse.close()
_sse = null
}
enriching.value = false
}
function _openUpdates(sessionId: string, apiBase: string) {
closeUpdates() // close any previous stream
enriching.value = true
const es = new EventSource(`${apiBase}/api/updates/${sessionId}`)
_sse = es
es.onmessage = (e) => {
try {
const update = JSON.parse(e.data) as {
platform_listing_id: string
trust_score: TrustScore
seller: Record<string, unknown>
market_price: number | null
}
if (update.platform_listing_id && update.trust_score) {
trustScores.value = new Map(trustScores.value)
trustScores.value.set(update.platform_listing_id, update.trust_score)
}
if (update.seller) {
const s = update.seller as Seller
if (s.platform_seller_id) {
sellers.value = new Map(sellers.value)
sellers.value.set(s.platform_seller_id, s)
}
}
if (update.market_price != null) {
marketPrice.value = update.market_price
}
} catch {
// malformed event — ignore
}
}
es.addEventListener('done', () => {
closeUpdates()
})
es.onerror = () => {
closeUpdates()
}
}
async function enrichSeller(sellerUsername: string, listingId: string): Promise<void> {
const apiBase = (import.meta.env.VITE_API_BASE as string) ?? ''
const params = new URLSearchParams({
@ -230,10 +290,12 @@ export const useSearchStore = defineStore('search', () => {
adapterUsed,
affiliateActive,
loading,
enriching,
error,
search,
cancelSearch,
enrichSeller,
closeUpdates,
clearResults,
}
})

View file

@ -312,6 +312,11 @@
</span>
</p>
<div class="toolbar-actions">
<!-- Live enrichment indicator visible while SSE stream is open -->
<span v-if="store.enriching" class="enriching-badge" aria-live="polite" title="Scores updating as seller data arrives">
<span class="enriching-dot" aria-hidden="true"></span>
Updating scores
</span>
<label for="sort-select" class="sr-only">Sort by</label>
<select id="sort-select" v-model="sortBy" class="sort-select">
<option v-for="opt in SORT_OPTIONS" :key="opt.value" :value="opt.value">
@ -1079,6 +1084,33 @@ async function onSearch() {
flex-wrap: wrap;
}
.enriching-badge {
display: inline-flex;
align-items: center;
gap: var(--space-1);
padding: var(--space-1) var(--space-2);
background: color-mix(in srgb, var(--app-primary) 10%, transparent);
border: 1px solid color-mix(in srgb, var(--app-primary) 30%, transparent);
border-radius: var(--radius-full, 9999px);
color: var(--app-primary);
font-size: 0.75rem;
font-weight: 500;
white-space: nowrap;
}
.enriching-dot {
width: 6px;
height: 6px;
border-radius: 50%;
background: var(--app-primary);
animation: enriching-pulse 1.2s ease-in-out infinite;
}
@keyframes enriching-pulse {
0%, 100% { opacity: 1; transform: scale(1); }
50% { opacity: 0.4; transform: scale(0.7); }
}
.save-btn {
display: flex;
align-items: center;