diff --git a/api/main.py b/api/main.py index d4ae265..e5fd187 100644 --- a/api/main.py +++ b/api/main.py @@ -3,16 +3,20 @@ from __future__ import annotations import dataclasses import hashlib +import json as _json import logging import os +import queue as _queue +import uuid from concurrent.futures import ThreadPoolExecutor from contextlib import asynccontextmanager from pathlib import Path +import asyncio import csv import io -from fastapi import Depends, FastAPI, HTTPException, UploadFile, File +from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, File from fastapi.responses import StreamingResponse from pydantic import BaseModel from fastapi.middleware.cors import CORSMiddleware @@ -34,6 +38,12 @@ from api.ebay_webhook import router as ebay_webhook_router load_env(Path(".env")) log = logging.getLogger(__name__) +# ── SSE update registry ─────────────────────────────────────────────────────── +# Maps session_id → SimpleQueue of update events. +# SimpleQueue is always thread-safe; no asyncio loop needed to write from threads. +# Keys are cleaned up when the SSE stream ends (client disconnect or timeout). +_update_queues: dict[str, _queue.SimpleQueue] = {} + @asynccontextmanager async def _lifespan(app: FastAPI): @@ -109,31 +119,34 @@ def _trigger_scraper_enrichment( listings: list, shared_store: Store, shared_db: Path, + user_db: Path | None = None, + query: str = "", + session_id: str | None = None, ) -> None: """Fire-and-forget background enrichment for missing seller signals. - Two enrichment passes run concurrently in the same daemon thread: + Two enrichment passes run in the same daemon thread: 1. BTF (/itm/ pages) — fills account_age_days for sellers where it is None. 2. _ssn search pages — fills category_history_json for sellers with no history. - The main response returns immediately; enriched data lands in the DB for - future searches. Uses ScrapedEbayAdapter's Playwright stack regardless of - which adapter was used for the main search (Shopping API handles age for - the API adapter inline; BTF is the fallback for no-creds / scraper mode). + When session_id is provided, pushes re-scored trust score updates to the + SSE queue after each pass so the frontend can update scores live. shared_store: used for pre-flight seller checks (same-thread reads). - shared_db: path passed to background thread — it creates its own Store - (sqlite3 connections are not thread-safe). + shared_db: path passed to background thread (sqlite3 is not thread-safe). + user_db: path to per-user listings/trust_scores DB (same as shared_db in local mode). + query: original search query — used for market comp lookup during re-score. + session_id: SSE session key; if set, updates are pushed to _update_queues[session_id]. """ - # Caps per search: limits Playwright sessions launched in the background so we - # don't hammer Kasada or spin up dozens of Xvfb instances after a large search. - # Remaining sellers get enriched incrementally on subsequent searches. _BTF_MAX_PER_SEARCH = 3 _CAT_MAX_PER_SEARCH = 3 needs_btf: dict[str, str] = {} needs_categories: list[str] = [] + # Map seller_id → [listings] for this search so we know what to re-score + seller_listing_map: dict[str, list] = {} + for listing in listings: sid = listing.seller_platform_id if not sid: @@ -141,6 +154,7 @@ def _trigger_scraper_enrichment( seller = shared_store.get_seller("ebay", sid) if not seller: continue + seller_listing_map.setdefault(sid, []).append(listing) if ((seller.account_age_days is None or seller.feedback_count == 0) and sid not in needs_btf and len(needs_btf) < _BTF_MAX_PER_SEARCH): @@ -151,6 +165,8 @@ def _trigger_scraper_enrichment( needs_categories.append(sid) if not needs_btf and not needs_categories: + if session_id and session_id in _update_queues: + _update_queues[session_id].put(None) # sentinel — nothing to enrich return log.info( @@ -158,17 +174,55 @@ def _trigger_scraper_enrichment( len(needs_btf), len(needs_categories), ) + def _push_updates(enriched_seller_ids: list[str]) -> None: + """Re-score listings for enriched sellers and push updates to SSE queue.""" + if not session_id or session_id not in _update_queues: + return + q = _update_queues[session_id] + thread_shared = Store(shared_db) + thread_user = Store(user_db or shared_db) + scorer = TrustScorer(thread_shared) + comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest()) + market_price = comp.median_price if comp else None + for sid in enriched_seller_ids: + seller = thread_shared.get_seller("ebay", sid) + if not seller: + continue + affected = seller_listing_map.get(sid, []) + if not affected: + continue + new_scores = scorer.score_batch(affected, query) + thread_user.save_trust_scores(new_scores) + for listing, ts in zip(affected, new_scores): + if ts is None: + continue + q.put({ + "platform_listing_id": listing.platform_listing_id, + "trust_score": dataclasses.asdict(ts), + "seller": dataclasses.asdict(seller), + "market_price": market_price, + }) + def _run(): try: enricher = ScrapedEbayAdapter(Store(shared_db)) if needs_btf: enricher.enrich_sellers_btf(needs_btf, max_workers=2) log.info("BTF enrichment complete for %d sellers", len(needs_btf)) + _push_updates(list(needs_btf.keys())) if needs_categories: enricher.enrich_sellers_categories(needs_categories, max_workers=2) log.info("Category enrichment complete for %d sellers", len(needs_categories)) + # Re-score only sellers not already covered by BTF push + cat_only = [s for s in needs_categories if s not in needs_btf] + if cat_only: + _push_updates(cat_only) except Exception as e: log.warning("Scraper enrichment failed: %s", e) + finally: + # Sentinel: tells SSE stream the enrichment thread is done + if session_id and session_id in _update_queues: + _update_queues[session_id].put(None) import threading t = threading.Thread(target=_run, daemon=True) @@ -365,9 +419,15 @@ def search( listings = [staged.get(l.platform_listing_id, l) for l in listings] # BTF enrichment: scrape /itm/ pages for sellers missing account_age_days. - # Runs in the background so it doesn't delay the response; next search of - # the same sellers will have full scores. - _trigger_scraper_enrichment(listings, shared_store, shared_db) + # Runs in the background so it doesn't delay the response. A session_id is + # generated so the frontend can open an SSE stream and receive live score + # updates as enrichment completes. + session_id = str(uuid.uuid4()) + _update_queues[session_id] = _queue.SimpleQueue() + _trigger_scraper_enrichment( + listings, shared_store, shared_db, + user_db=user_db, query=q, session_id=session_id, + ) scorer = TrustScorer(shared_store) trust_scores_list = scorer.score_batch(listings, q) @@ -411,6 +471,7 @@ def search( "market_price": market_price, "adapter_used": adapter_used, "affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()), + "session_id": session_id, } @@ -508,6 +569,73 @@ def enrich_seller( } +# ── SSE live score updates ──────────────────────────────────────────────────── + +@app.get("/api/updates/{session_id}") +async def stream_updates(session_id: str, request: Request): + """Server-Sent Events stream for live trust score updates. + + Opens after a search when any listings have score_is_partial=true. + Streams re-scored trust score payloads as enrichment completes, then + sends a 'done' event and closes. + + Each event payload: + { platform_listing_id, trust_score, seller, market_price } + + Closes automatically after 90 seconds (worst-case Playwright enrichment). + The client should also close on 'done' event. + """ + if session_id not in _update_queues: + raise HTTPException(status_code=404, detail="Unknown session_id") + + q = _update_queues[session_id] + deadline = asyncio.get_event_loop().time() + 90.0 + heartbeat_interval = 15.0 + next_heartbeat = asyncio.get_event_loop().time() + heartbeat_interval + + async def generate(): + nonlocal next_heartbeat + try: + while asyncio.get_event_loop().time() < deadline: + if await request.is_disconnected(): + break + + # Drain all available updates (non-blocking) + while True: + try: + item = q.get_nowait() + except _queue.Empty: + break + if item is None: + # Sentinel: enrichment thread is done + yield "event: done\ndata: {}\n\n" + return + yield f"data: {_json.dumps(item)}\n\n" + + # Heartbeat to keep the connection alive through proxies + now = asyncio.get_event_loop().time() + if now >= next_heartbeat: + yield ": heartbeat\n\n" + next_heartbeat = now + heartbeat_interval + + await asyncio.sleep(0.5) + + # Timeout reached + yield "event: done\ndata: {\"reason\": \"timeout\"}\n\n" + finally: + _update_queues.pop(session_id, None) + + return StreamingResponse( + generate(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", # nginx: disable proxy buffering for SSE + "Connection": "keep-alive", + }, + ) + + # ── Saved Searches ──────────────────────────────────────────────────────────── class SavedSearchCreate(BaseModel): diff --git a/web/src/stores/search.ts b/web/src/stores/search.ts index 2269e76..3fca4f8 100644 --- a/web/src/stores/search.ts +++ b/web/src/stores/search.ts @@ -123,8 +123,10 @@ export const useSearchStore = defineStore('search', () => { const affiliateActive = ref(false) const loading = ref(false) const error = ref(null) + const enriching = ref(false) // true while SSE stream is open let _abort: AbortController | null = null + let _sse: EventSource | null = null function cancelSearch() { _abort?.abort() @@ -166,6 +168,7 @@ export const useSearchStore = defineStore('search', () => { market_price: number | null adapter_used: 'api' | 'scraper' affiliate_active: boolean + session_id: string | null } results.value = data.listings ?? [] @@ -182,6 +185,12 @@ export const useSearchStore = defineStore('search', () => { marketPrice: marketPrice.value, adapterUsed: adapterUsed.value, }) + + // Open SSE stream if any scores are partial and a session_id was provided + const hasPartial = Object.values(data.trust_scores ?? {}).some(ts => ts.score_is_partial) + if (data.session_id && hasPartial) { + _openUpdates(data.session_id, apiBase) + } } catch (e) { if (e instanceof DOMException && e.name === 'AbortError') { // User cancelled — clear loading but don't surface as an error @@ -196,6 +205,57 @@ export const useSearchStore = defineStore('search', () => { } } + function closeUpdates() { + if (_sse) { + _sse.close() + _sse = null + } + enriching.value = false + } + + function _openUpdates(sessionId: string, apiBase: string) { + closeUpdates() // close any previous stream + enriching.value = true + + const es = new EventSource(`${apiBase}/api/updates/${sessionId}`) + _sse = es + + es.onmessage = (e) => { + try { + const update = JSON.parse(e.data) as { + platform_listing_id: string + trust_score: TrustScore + seller: Record + market_price: number | null + } + if (update.platform_listing_id && update.trust_score) { + trustScores.value = new Map(trustScores.value) + trustScores.value.set(update.platform_listing_id, update.trust_score) + } + if (update.seller) { + const s = update.seller as Seller + if (s.platform_seller_id) { + sellers.value = new Map(sellers.value) + sellers.value.set(s.platform_seller_id, s) + } + } + if (update.market_price != null) { + marketPrice.value = update.market_price + } + } catch { + // malformed event — ignore + } + } + + es.addEventListener('done', () => { + closeUpdates() + }) + + es.onerror = () => { + closeUpdates() + } + } + async function enrichSeller(sellerUsername: string, listingId: string): Promise { const apiBase = (import.meta.env.VITE_API_BASE as string) ?? '' const params = new URLSearchParams({ @@ -230,10 +290,12 @@ export const useSearchStore = defineStore('search', () => { adapterUsed, affiliateActive, loading, + enriching, error, search, cancelSearch, enrichSeller, + closeUpdates, clearResults, } }) diff --git a/web/src/views/SearchView.vue b/web/src/views/SearchView.vue index 9557693..3ffc87e 100644 --- a/web/src/views/SearchView.vue +++ b/web/src/views/SearchView.vue @@ -312,6 +312,11 @@

+ + + + Updating scores… +