feat: near-term UX batch -- URL normalization, currency preference, async search/SSE
Some checks are pending
CI / Python tests (push) Waiting to run
CI / Frontend typecheck + tests (push) Waiting to run
Mirror / mirror (push) Waiting to run
Release / release (push) Waiting to run

This commit is contained in:
pyr0ball 2026-04-20 11:04:52 -07:00
commit 844721c6fd
10 changed files with 1181 additions and 70 deletions

View file

@ -10,11 +10,13 @@ import json as _json
import logging
import os
import queue as _queue
import re
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
from urllib.parse import parse_qs, urlparse
from circuitforge_core.affiliates import wrap_url as _wrap_affiliate_url
from circuitforge_core.api import make_corrections_router as _make_corrections_router
@ -210,6 +212,52 @@ async def _lifespan(app: FastAPI):
_community_store = None
_EBAY_ITM_RE = re.compile(r"/itm/(?:[^/]+/)?(\d{8,13})(?:[/?#]|$)")
_EBAY_ITEM_ID_DIGITS = re.compile(r"^\d{8,13}$")
def _extract_ebay_item_id(q: str) -> str | None:
"""Extract a numeric eBay item ID from a URL, or return None if *q* is not an eBay URL.
Supported formats:
- https://www.ebay.com/itm/Title-String/123456789012
- https://www.ebay.com/itm/123456789012
- https://ebay.com/itm/123456789012
- https://pay.ebay.com/rxo?action=view&sessionid=...&itemId=123456789012
- https://pay.ebay.com/rxo/view?itemId=123456789012
"""
q = q.strip()
# Must look like a URL — require http/https scheme or an ebay.com hostname.
if not (q.startswith("http://") or q.startswith("https://")):
return None
try:
parsed = urlparse(q)
except Exception:
return None
host = parsed.hostname or ""
if not (host == "ebay.com" or host.endswith(".ebay.com")):
return None
# pay.ebay.com checkout URLs — item ID is in the itemId query param.
if host == "pay.ebay.com":
params = parse_qs(parsed.query)
item_id_list = params.get("itemId") or params.get("itemid")
if item_id_list:
candidate = item_id_list[0]
if _EBAY_ITEM_ID_DIGITS.match(candidate):
return candidate
return None
# Standard listing URLs — item ID appears after /itm/.
m = _EBAY_ITM_RE.search(parsed.path)
if m:
return m.group(1)
return None
def _ebay_creds() -> tuple[str, str, str]:
"""Return (client_id, client_secret, env) from env vars.
@ -587,6 +635,13 @@ def search(
adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection
session: CloudUser = Depends(get_session),
):
# If the user pasted an eBay listing or checkout URL, extract the item ID
# and use it as the search query so the exact item surfaces in results.
ebay_item_id = _extract_ebay_item_id(q)
if ebay_item_id:
log.info("search: eBay URL detected, extracted item_id=%s", ebay_item_id)
q = ebay_item_id
if not q.strip():
return {"listings": [], "trust_scores": {}, "sellers": {}, "market_price": None, "adapter_used": _adapter_name(adapter)}
@ -800,6 +855,267 @@ def search(
}
# ── Async search (fire-and-forget + SSE streaming) ───────────────────────────
# Module-level executor shared across all async search requests.
# max_workers=4 caps concurrent Playwright/scraper sessions to avoid OOM.
_search_executor = ThreadPoolExecutor(max_workers=4)
@app.get("/api/search/async", status_code=202)
def search_async(
q: str = "",
max_price: Optional[float] = None,
min_price: Optional[float] = None,
pages: int = 1,
must_include: str = "",
must_include_mode: str = "all",
must_exclude: str = "",
category_id: str = "",
adapter: str = "auto",
session: CloudUser = Depends(get_session),
):
"""Async variant of GET /api/search.
Returns HTTP 202 immediately with a session_id, then streams scrape results
and trust scores via GET /api/updates/{session_id} as they become available.
SSE event types pushed to the queue:
{"type": "listings", "listings": [...], "trust_scores": {...}, "sellers": {...},
"market_price": ..., "adapter_used": ..., "affiliate_active": ...}
{"type": "market_price", "market_price": 123.45} (if comp resolves after listings)
{"type": "update", "platform_listing_id": "...", "trust_score": {...},
"seller": {...}, "market_price": ...} (enrichment updates)
None (sentinel stream finished)
"""
# Validate / normalise params — same logic as synchronous endpoint.
ebay_item_id = _extract_ebay_item_id(q)
if ebay_item_id:
q = ebay_item_id
if not q.strip():
# Return a completed (empty) session so the client can open the SSE
# stream and immediately receive a done event.
empty_id = str(uuid.uuid4())
_update_queues[empty_id] = _queue.SimpleQueue()
_update_queues[empty_id].put({
"type": "listings",
"listings": [],
"trust_scores": {},
"sellers": {},
"market_price": None,
"adapter_used": _adapter_name(adapter),
"affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()),
})
_update_queues[empty_id].put(None)
return {"session_id": empty_id, "status": "queued"}
features = compute_features(session.tier)
pages = min(max(1, pages), features.max_pages)
session_id = str(uuid.uuid4())
_update_queues[session_id] = _queue.SimpleQueue()
# Capture everything the background worker needs — don't pass session object
# (it may not be safe to use across threads).
_shared_db = session.shared_db
_user_db = session.user_db
_tier = session.tier
_user_id = session.user_id
_affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
def _background_search() -> None:
"""Run the full search pipeline and push SSE events to the queue."""
import hashlib as _hashlib
import sqlite3 as _sqlite3
q_norm = q # captured from outer scope
must_exclude_terms = _parse_terms(must_exclude)
if must_include_mode == "groups" and must_include.strip():
or_groups = parse_groups(must_include)
ebay_queries = expand_queries(q_norm, or_groups)
else:
ebay_queries = [q_norm]
if must_include_mode == "groups" and len(ebay_queries) > 0:
comp_query = ebay_queries[0]
elif must_include_mode == "all" and must_include.strip():
extra = " ".join(_parse_terms(must_include))
comp_query = f"{q_norm} {extra}".strip()
else:
comp_query = q_norm
base_filters = SearchFilters(
max_price=max_price if max_price and max_price > 0 else None,
min_price=min_price if min_price and min_price > 0 else None,
pages=pages,
must_exclude=must_exclude_terms,
category_id=category_id.strip() or None,
)
adapter_used = _adapter_name(adapter)
q_ref = _update_queues.get(session_id)
if q_ref is None:
return # client disconnected before we even started
def _push(event: dict | None) -> None:
"""Push an event to the queue; silently drop if session no longer exists."""
sq = _update_queues.get(session_id)
if sq is not None:
sq.put(event)
try:
def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(_shared_db), adapter).search(ebay_query, base_filters)
def _run_comps() -> None:
try:
_make_adapter(Store(_shared_db), adapter).get_completed_sales(comp_query, pages)
except Exception:
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
max_workers_inner = min(len(ebay_queries) + 1, 5)
with ThreadPoolExecutor(max_workers=max_workers_inner) as ex:
comps_future = ex.submit(_run_comps)
search_futures = [ex.submit(_run_search, eq) for eq in ebay_queries]
seen_ids: set[str] = set()
listings: list = []
for fut in search_futures:
for listing in fut.result():
if listing.platform_listing_id not in seen_ids:
seen_ids.add(listing.platform_listing_id)
listings.append(listing)
comps_future.result()
log.info(
"async_search auth=%s tier=%s adapter=%s pages=%d listings=%d q=%r",
_auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm,
)
shared_store = Store(_shared_db)
user_store = Store(_user_db)
user_store.save_listings(listings)
seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id})
n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store)
if n_cat:
log.info("async_search: category history derived for %d sellers", n_cat)
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
listings = [staged.get(l.platform_listing_id, l) for l in listings]
_main_adapter = _make_adapter(shared_store, adapter)
sellers_needing_age = [
l.seller_platform_id for l in listings
if l.seller_platform_id
and shared_store.get_seller("ebay", l.seller_platform_id) is not None
and shared_store.get_seller("ebay", l.seller_platform_id).account_age_days is None
]
seen_set: set[str] = set()
sellers_needing_age = [s for s in sellers_needing_age if not (s in seen_set or seen_set.add(s))] # type: ignore[func-returns-value]
# Use a temporary CloudUser-like object for Trading API enrichment
from api.cloud_session import CloudUser as _CloudUser
_session_stub = _CloudUser(
user_id=_user_id,
tier=_tier,
shared_db=_shared_db,
user_db=_user_db,
)
trading_api_enriched = _try_trading_api_enrichment(
_main_adapter, sellers_needing_age, _user_db
)
scorer = TrustScorer(shared_store)
trust_scores_list = scorer.score_batch(listings, q_norm)
user_store.save_trust_scores(trust_scores_list)
# Enqueue vision tasks for paid+ tiers
features_obj = compute_features(_tier)
if features_obj.photo_analysis:
_enqueue_vision_tasks(listings, trust_scores_list, _session_stub)
query_hash = _hashlib.md5(comp_query.encode()).hexdigest()
comp = shared_store.get_market_comp("ebay", query_hash)
market_price = comp.median_price if comp else None
trust_map = {
listing.platform_listing_id: dataclasses.asdict(ts)
for listing, ts in zip(listings, trust_scores_list)
if ts is not None
}
seller_map = {
listing.seller_platform_id: dataclasses.asdict(
shared_store.get_seller("ebay", listing.seller_platform_id)
)
for listing in listings
if listing.seller_platform_id
and shared_store.get_seller("ebay", listing.seller_platform_id)
}
_is_unauthed = _user_id == "anonymous" or _user_id.startswith("guest:")
_pref_store = None if _is_unauthed else user_store
def _get_pref(uid: Optional[str], path: str, default=None):
return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr]
def _serialize_listing(l: object) -> dict:
d = dataclasses.asdict(l)
d["url"] = _wrap_affiliate_url(
d["url"],
retailer="ebay",
user_id=None if _is_unauthed else _user_id,
get_preference=_get_pref if _pref_store is not None else None,
)
return d
# Push the initial listings batch
_push({
"type": "listings",
"listings": [_serialize_listing(l) for l in listings],
"trust_scores": trust_map,
"sellers": seller_map,
"market_price": market_price,
"adapter_used": adapter_used,
"affiliate_active": _affiliate_active,
"session_id": session_id,
})
# Kick off background enrichment — it pushes "update" events and the sentinel.
_trigger_scraper_enrichment(
listings, shared_store, _shared_db,
user_db=_user_db, query=comp_query, session_id=session_id,
skip_seller_ids=trading_api_enriched,
)
except _sqlite3.OperationalError as e:
log.warning("async_search DB contention: %s", e)
_push({
"type": "listings",
"listings": [],
"trust_scores": {},
"sellers": {},
"market_price": None,
"adapter_used": adapter_used,
"affiliate_active": _affiliate_active,
"session_id": session_id,
})
_push(None)
except Exception as e:
log.warning("async_search background scrape failed: %s", e)
_push({
"type": "error",
"message": str(e),
})
_push(None)
_search_executor.submit(_background_search)
return {"session_id": session_id, "status": "queued"}
# ── On-demand enrichment ──────────────────────────────────────────────────────
@app.post("/api/enrich")
@ -900,21 +1216,33 @@ def enrich_seller(
async def stream_updates(session_id: str, request: Request):
"""Server-Sent Events stream for live trust score updates.
Opens after a search when any listings have score_is_partial=true.
Streams re-scored trust score payloads as enrichment completes, then
sends a 'done' event and closes.
Used both by the synchronous search endpoint (enrichment-only updates) and
the async search endpoint (initial listings + enrichment updates).
Each event payload:
Event data formats:
Enrichment update (legacy / sync search):
{ platform_listing_id, trust_score, seller, market_price }
Async search initial batch:
{ type: "listings", listings, trust_scores, sellers, market_price,
adapter_used, affiliate_active, session_id }
Async search market price resolved after listings:
{ type: "market_price", market_price }
Async search per-seller enrichment update:
{ type: "update", platform_listing_id, trust_score, seller, market_price }
Error:
{ type: "error", message }
Closes automatically after 90 seconds (worst-case Playwright enrichment).
The client should also close on 'done' event.
All events are serialised as plain `data:` lines (no named event type).
The stream ends with a named `event: done` line.
Closes automatically after 150 seconds (covers worst-case async scrape + enrichment).
The client should also close on the 'done' event.
"""
if session_id not in _update_queues:
raise HTTPException(status_code=404, detail="Unknown session_id")
q = _update_queues[session_id]
deadline = asyncio.get_event_loop().time() + 90.0
deadline = asyncio.get_event_loop().time() + 150.0
heartbeat_interval = 15.0
next_heartbeat = asyncio.get_event_loop().time() + heartbeat_interval
@ -1280,6 +1608,11 @@ def get_preferences(session: CloudUser = Depends(get_session)) -> dict:
return store.get_all_preferences()
_SUPPORTED_CURRENCIES = frozenset({
"USD", "GBP", "EUR", "CAD", "AUD", "JPY", "CHF", "MXN", "BRL", "INR",
})
@app.patch("/api/preferences")
def patch_preference(
body: PreferenceUpdate,
@ -1289,6 +1622,7 @@ def patch_preference(
- ``affiliate.opt_out`` available to all signed-in users.
- ``affiliate.byok_ids.ebay`` Premium tier only.
- ``display.currency`` ISO 4217 code from the supported set.
Returns the full updated preferences dict.
"""
@ -1302,6 +1636,15 @@ def patch_preference(
status_code=403,
detail="Custom affiliate IDs (BYOK) require a Premium subscription.",
)
if body.path == "display.currency":
code = str(body.value or "").strip().upper()
if code not in _SUPPORTED_CURRENCIES:
supported = ", ".join(sorted(_SUPPORTED_CURRENCIES))
raise HTTPException(
status_code=400,
detail=f"Unsupported currency code '{body.value}'. Supported codes: {supported}",
)
body = PreferenceUpdate(path=body.path, value=code)
store = Store(session.user_db)
store.set_user_preference(body.path, body.value)
return store.get_all_preferences()

View file

@ -1,5 +1,6 @@
import pytest
from api.main import _extract_ebay_item_id
from app.platforms.ebay.normaliser import normalise_listing, normalise_seller
@ -56,3 +57,48 @@ def test_normalise_seller_maps_fields():
assert seller.feedback_count == 300
assert seller.feedback_ratio == pytest.approx(0.991, abs=0.001)
assert seller.account_age_days > 0
# ── _extract_ebay_item_id ─────────────────────────────────────────────────────
class TestExtractEbayItemId:
"""Unit tests for the URL-to-item-ID normaliser."""
def test_itm_url_with_title_slug(self):
url = "https://www.ebay.com/itm/Sony-WH-1000XM5-Headphones/123456789012"
assert _extract_ebay_item_id(url) == "123456789012"
def test_itm_url_without_title_slug(self):
url = "https://www.ebay.com/itm/123456789012"
assert _extract_ebay_item_id(url) == "123456789012"
def test_itm_url_no_www(self):
url = "https://ebay.com/itm/123456789012"
assert _extract_ebay_item_id(url) == "123456789012"
def test_itm_url_with_query_params(self):
url = "https://www.ebay.com/itm/123456789012?hash=item1234abcd"
assert _extract_ebay_item_id(url) == "123456789012"
def test_pay_ebay_rxo_with_itemId_query_param(self):
url = "https://pay.ebay.com/rxo?action=view&sessionid=abc123&itemId=123456789012"
assert _extract_ebay_item_id(url) == "123456789012"
def test_pay_ebay_rxo_path_with_itemId(self):
url = "https://pay.ebay.com/rxo/view?itemId=123456789012"
assert _extract_ebay_item_id(url) == "123456789012"
def test_non_ebay_url_returns_none(self):
assert _extract_ebay_item_id("https://amazon.com/dp/B08N5WRWNW") is None
def test_plain_keyword_returns_none(self):
assert _extract_ebay_item_id("rtx 4090 gpu") is None
def test_empty_string_returns_none(self):
assert _extract_ebay_item_id("") is None
def test_ebay_url_no_item_id_returns_none(self):
assert _extract_ebay_item_id("https://www.ebay.com/sch/i.html?_nkw=gpu") is None
def test_pay_ebay_no_item_id_returns_none(self):
assert _extract_ebay_item_id("https://pay.ebay.com/rxo?action=view&sessionid=abc") is None

231
tests/test_async_search.py Normal file
View file

@ -0,0 +1,231 @@
"""Tests for GET /api/search/async (fire-and-forget search + SSE streaming).
Verifies:
- Returns HTTP 202 with session_id and status: "queued"
- session_id is registered in _update_queues immediately
- Actual scraping is not performed (mocked out)
- Empty query path returns a completed session with done event
"""
from __future__ import annotations
import os
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from fastapi.testclient import TestClient
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def client(tmp_path):
"""TestClient with a fresh tmp DB. Must set SNIPE_DB *before* importing app."""
os.environ["SNIPE_DB"] = str(tmp_path / "snipe.db")
from api.main import app
return TestClient(app, raise_server_exceptions=False)
def _make_mock_listing():
"""Return a minimal mock listing object that satisfies the search pipeline."""
m = MagicMock()
m.platform_listing_id = "123456789"
m.seller_platform_id = "test_seller"
m.title = "Test GPU"
m.price = 100.0
m.currency = "USD"
m.condition = "Used"
m.url = "https://www.ebay.com/itm/123456789"
m.photo_urls = []
m.listing_age_days = 5
m.buying_format = "fixed_price"
m.ends_at = None
m.fetched_at = None
m.trust_score_id = None
m.id = 1
m.category_name = None
return m
# ── Core contract tests ───────────────────────────────────────────────────────
def test_async_search_returns_202(client):
"""GET /api/search/async?q=... returns HTTP 202 with session_id and status."""
with (
patch("api.main._make_adapter") as mock_adapter_factory,
patch("api.main._trigger_scraper_enrichment"),
patch("api.main.TrustScorer") as mock_scorer_cls,
):
mock_adapter = MagicMock()
mock_adapter.search.return_value = []
mock_adapter.get_completed_sales.return_value = None
mock_adapter_factory.return_value = mock_adapter
mock_scorer = MagicMock()
mock_scorer.score_batch.return_value = []
mock_scorer_cls.return_value = mock_scorer
resp = client.get("/api/search/async?q=test+gpu")
assert resp.status_code == 202
data = resp.json()
assert "session_id" in data
assert data["status"] == "queued"
assert isinstance(data["session_id"], str)
assert len(data["session_id"]) > 0
def test_async_search_registers_session_id(client):
"""session_id returned by 202 response must appear in _update_queues immediately."""
with (
patch("api.main._make_adapter") as mock_adapter_factory,
patch("api.main._trigger_scraper_enrichment"),
patch("api.main.TrustScorer") as mock_scorer_cls,
):
mock_adapter = MagicMock()
mock_adapter.search.return_value = []
mock_adapter.get_completed_sales.return_value = None
mock_adapter_factory.return_value = mock_adapter
mock_scorer = MagicMock()
mock_scorer.score_batch.return_value = []
mock_scorer_cls.return_value = mock_scorer
resp = client.get("/api/search/async?q=test+gpu")
assert resp.status_code == 202
session_id = resp.json()["session_id"]
# The queue must be registered so the SSE endpoint can open it.
from api.main import _update_queues
assert session_id in _update_queues
def test_async_search_empty_query(client):
"""Empty query returns 202 with a pre-loaded done sentinel, no scraping needed."""
resp = client.get("/api/search/async?q=")
assert resp.status_code == 202
data = resp.json()
assert data["status"] == "queued"
assert "session_id" in data
from api.main import _update_queues
import queue as _queue
sid = data["session_id"]
assert sid in _update_queues
q = _update_queues[sid]
# First item should be the empty listings event
first = q.get_nowait()
assert first is not None
assert first["type"] == "listings"
assert first["listings"] == []
# Second item should be the sentinel
sentinel = q.get_nowait()
assert sentinel is None
def test_async_search_no_real_chromium(client):
"""Async search endpoint must not launch real Chromium in tests.
Verifies that the background scraper is submitted to the executor but the
adapter factory is patched no real Playwright/Xvfb process is spawned.
Uses a broad patch on Store to avoid sqlite3 DB path issues in the thread pool.
"""
import threading
scrape_called = threading.Event()
def _fake_search(query, filters):
scrape_called.set()
return []
with (
patch("api.main._make_adapter") as mock_adapter_factory,
patch("api.main._trigger_scraper_enrichment"),
patch("api.main.TrustScorer") as mock_scorer_cls,
patch("api.main.Store") as mock_store_cls,
):
mock_adapter = MagicMock()
mock_adapter.search.side_effect = _fake_search
mock_adapter.get_completed_sales.return_value = None
mock_adapter_factory.return_value = mock_adapter
mock_scorer = MagicMock()
mock_scorer.score_batch.return_value = []
mock_scorer_cls.return_value = mock_scorer
mock_store = MagicMock()
mock_store.get_listings_staged.return_value = {}
mock_store.refresh_seller_categories.return_value = 0
mock_store.save_listings.return_value = None
mock_store.save_trust_scores.return_value = None
mock_store.get_market_comp.return_value = None
mock_store.get_seller.return_value = None
mock_store.get_user_preference.return_value = None
mock_store_cls.return_value = mock_store
resp = client.get("/api/search/async?q=rtx+3080")
assert resp.status_code == 202
# Give the background worker a moment to run (it's in a thread pool)
scrape_called.wait(timeout=5.0)
# If we get here without a real Playwright process, the test passes.
assert scrape_called.is_set(), "Background search worker never ran"
def test_async_search_query_params_forwarded(client):
"""All filter params accepted by /api/search are also accepted here."""
with (
patch("api.main._make_adapter") as mock_adapter_factory,
patch("api.main._trigger_scraper_enrichment"),
patch("api.main.TrustScorer") as mock_scorer_cls,
):
mock_adapter = MagicMock()
mock_adapter.search.return_value = []
mock_adapter.get_completed_sales.return_value = None
mock_adapter_factory.return_value = mock_adapter
mock_scorer = MagicMock()
mock_scorer.score_batch.return_value = []
mock_scorer_cls.return_value = mock_scorer
resp = client.get(
"/api/search/async"
"?q=rtx+3080"
"&max_price=400"
"&min_price=100"
"&pages=2"
"&must_include=rtx,3080"
"&must_include_mode=all"
"&must_exclude=mining"
"&category_id=27386"
"&adapter=auto"
)
assert resp.status_code == 202
def test_async_search_session_id_is_uuid(client):
"""session_id must be a valid UUID v4 string."""
import uuid as _uuid
with (
patch("api.main._make_adapter") as mock_adapter_factory,
patch("api.main._trigger_scraper_enrichment"),
patch("api.main.TrustScorer") as mock_scorer_cls,
):
mock_adapter = MagicMock()
mock_adapter.search.return_value = []
mock_adapter.get_completed_sales.return_value = None
mock_adapter_factory.return_value = mock_adapter
mock_scorer = MagicMock()
mock_scorer.score_batch.return_value = []
mock_scorer_cls.return_value = mock_scorer
resp = client.get("/api/search/async?q=test")
assert resp.status_code == 202
sid = resp.json()["session_id"]
# Should not raise if it's a valid UUID
parsed = _uuid.UUID(sid)
assert str(parsed) == sid

View file

@ -0,0 +1,76 @@
"""Tests for PATCH /api/preferences display.currency validation."""
from __future__ import annotations
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client(tmp_path):
"""TestClient with a patched local DB path.
api.cloud_session._LOCAL_SNIPE_DB is set at module import time, so we
cannot rely on setting SNIPE_DB before import when other tests have already
triggered the module load. Patch the module-level variable directly so
the session dependency points at our fresh tmp DB for the duration of this
fixture.
"""
db_path = tmp_path / "snipe.db"
# Ensure the DB is initialised so the Store can create its tables.
import api.cloud_session as _cs
from circuitforge_core.db import get_connection, run_migrations
conn = get_connection(db_path)
run_migrations(conn, Path("app/db/migrations"))
conn.close()
from api.main import app
with patch.object(_cs, "_LOCAL_SNIPE_DB", db_path):
yield TestClient(app, raise_server_exceptions=False)
def test_set_display_currency_valid(client):
"""Accepted ISO 4217 codes are stored and returned."""
for code in ("USD", "GBP", "EUR", "CAD", "AUD", "JPY", "CHF", "MXN", "BRL", "INR"):
resp = client.patch("/api/preferences", json={"path": "display.currency", "value": code})
assert resp.status_code == 200, f"Expected 200 for {code}, got {resp.status_code}: {resp.text}"
data = resp.json()
assert data.get("display", {}).get("currency") == code
def test_set_display_currency_normalises_lowercase(client):
"""Lowercase code is accepted and normalised to uppercase."""
resp = client.patch("/api/preferences", json={"path": "display.currency", "value": "eur"})
assert resp.status_code == 200
assert resp.json()["display"]["currency"] == "EUR"
def test_set_display_currency_unsupported_returns_400(client):
"""Unsupported currency code returns 400 with a clear message."""
resp = client.patch("/api/preferences", json={"path": "display.currency", "value": "XYZ"})
assert resp.status_code == 400
detail = resp.json().get("detail", "")
assert "XYZ" in detail
assert "Supported" in detail or "supported" in detail
def test_set_display_currency_empty_string_returns_400(client):
"""Empty string is not a valid currency code."""
resp = client.patch("/api/preferences", json={"path": "display.currency", "value": ""})
assert resp.status_code == 400
def test_set_display_currency_none_returns_400(client):
"""None is not a valid currency code."""
resp = client.patch("/api/preferences", json={"path": "display.currency", "value": None})
assert resp.status_code == 400
def test_other_preference_paths_unaffected(client):
"""Unrelated preference paths still work normally after currency validation added."""
resp = client.patch("/api/preferences", json={"path": "affiliate.opt_out", "value": True})
assert resp.status_code == 200
assert resp.json().get("affiliate", {}).get("opt_out") is True

View file

@ -0,0 +1,140 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
// Reset module-level cache and fetch mock between tests
beforeEach(async () => {
vi.restoreAllMocks()
// Reset module-level cache so each test starts clean
const mod = await import('../composables/useCurrency')
mod._resetCacheForTest()
})
const MOCK_RATES: Record<string, number> = {
USD: 1,
GBP: 0.79,
EUR: 0.92,
JPY: 151.5,
CAD: 1.36,
}
function mockFetchSuccess(rates = MOCK_RATES) {
vi.stubGlobal('fetch', vi.fn().mockResolvedValue({
ok: true,
json: async () => ({ rates }),
}))
}
function mockFetchFailure() {
vi.stubGlobal('fetch', vi.fn().mockRejectedValue(new Error('Network error')))
}
describe('convertFromUSD', () => {
it('returns the same amount for USD (no conversion)', async () => {
mockFetchSuccess()
const { convertFromUSD } = await import('../composables/useCurrency')
const result = await convertFromUSD(100, 'USD')
expect(result).toBe(100)
// fetch should not be called for USD passthrough
expect(fetch).not.toHaveBeenCalled()
})
it('converts USD to GBP using fetched rates', async () => {
mockFetchSuccess()
const { convertFromUSD, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
const result = await convertFromUSD(100, 'GBP')
expect(result).toBeCloseTo(79, 1)
})
it('converts USD to JPY using fetched rates', async () => {
mockFetchSuccess()
const { convertFromUSD, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
const result = await convertFromUSD(10, 'JPY')
expect(result).toBeCloseTo(1515, 1)
})
it('returns the original amount when rates are unavailable (network failure)', async () => {
mockFetchFailure()
const { convertFromUSD, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
const result = await convertFromUSD(100, 'EUR')
expect(result).toBe(100)
})
it('returns the original amount when the currency code is unknown', async () => {
mockFetchSuccess({ USD: 1, EUR: 0.92 }) // no XYZ rate
const { convertFromUSD, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
const result = await convertFromUSD(50, 'XYZ')
expect(result).toBe(50)
})
it('only calls fetch once when called concurrently (deduplication)', async () => {
mockFetchSuccess()
const { convertFromUSD, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
await Promise.all([
convertFromUSD(100, 'GBP'),
convertFromUSD(200, 'EUR'),
convertFromUSD(50, 'CAD'),
])
expect((fetch as ReturnType<typeof vi.fn>).mock.calls.length).toBe(1)
})
})
describe('formatPrice', () => {
it('formats USD amount with dollar sign', async () => {
mockFetchSuccess()
const { formatPrice, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
const result = await formatPrice(99.99, 'USD')
expect(result).toMatch(/^\$99\.99$|^\$100$/) // Intl rounding may vary
expect(result).toContain('$')
})
it('formats GBP amount with correct symbol', async () => {
mockFetchSuccess()
const { formatPrice, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
const result = await formatPrice(100, 'GBP')
// GBP 79 — expect pound sign or "GBP" prefix
expect(result).toMatch(/[£]|GBP/)
})
it('formats JPY without decimal places (Intl rounds to zero decimals)', async () => {
mockFetchSuccess()
const { formatPrice, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
const result = await formatPrice(10, 'JPY')
// 10 * 151.5 = 1515 JPY — no decimal places for JPY
expect(result).toMatch(/¥1,515|JPY.*1,515|¥1515/)
})
it('falls back gracefully on network failure, showing USD', async () => {
mockFetchFailure()
const { formatPrice, _resetCacheForTest } = await import('../composables/useCurrency')
_resetCacheForTest()
// With failed rates, conversion returns original amount and uses Intl with target currency
// This may throw if Intl doesn't know EUR — but the function should not throw
const result = await formatPrice(50, 'EUR')
expect(typeof result).toBe('string')
expect(result.length).toBeGreaterThan(0)
})
})
describe('formatPriceUSD', () => {
it('formats a USD amount synchronously', async () => {
const { formatPriceUSD } = await import('../composables/useCurrency')
const result = formatPriceUSD(1234.5)
// Intl output varies by runtime locale data; check structure not exact string
expect(result).toContain('$')
expect(result).toContain('1,234')
})
it('formats zero as a USD string', async () => {
const { formatPriceUSD } = await import('../composables/useCurrency')
const result = formatPriceUSD(0)
expect(result).toContain('$')
expect(result).toMatch(/\$0/)
})
})

View file

@ -189,15 +189,18 @@
</template>
<script setup lang="ts">
import { computed, ref } from 'vue'
import { computed, ref, watch } from 'vue'
import { RouterLink } from 'vue-router'
import type { Listing, TrustScore, Seller } from '../stores/search'
import { useSearchStore } from '../stores/search'
import { useBlocklistStore } from '../stores/blocklist'
import TrustFeedbackButtons from './TrustFeedbackButtons.vue'
import { useTrustSignalPref } from '../composables/useTrustSignalPref'
import { formatPrice, formatPriceUSD } from '../composables/useCurrency'
import { usePreferencesStore } from '../stores/preferences'
const { enabled: trustSignalEnabled } = useTrustSignalPref()
const prefsStore = usePreferencesStore()
const props = defineProps<{
listing: Listing
@ -379,15 +382,26 @@ const isSteal = computed(() => {
return props.listing.price < props.marketPrice * 0.8
})
const formattedPrice = computed(() => {
const sym = props.listing.currency === 'USD' ? '$' : props.listing.currency + ' '
return `${sym}${props.listing.price.toLocaleString('en-US', { minimumFractionDigits: 0, maximumFractionDigits: 2 })}`
})
// Async price display show USD synchronously while rates load, then update
const formattedPrice = ref(formatPriceUSD(props.listing.price))
const formattedMarket = ref(props.marketPrice ? formatPriceUSD(props.marketPrice) : '')
const formattedMarket = computed(() => {
if (!props.marketPrice) return ''
return `$${props.marketPrice.toLocaleString('en-US', { maximumFractionDigits: 0 })}`
})
async function _updatePrices() {
const currency = prefsStore.displayCurrency
formattedPrice.value = await formatPrice(props.listing.price, currency)
if (props.marketPrice) {
formattedMarket.value = await formatPrice(props.marketPrice, currency)
} else {
formattedMarket.value = ''
}
}
// Update when the listing, marketPrice, or display currency changes
watch(
[() => props.listing.price, () => props.marketPrice, () => prefsStore.displayCurrency],
() => { _updatePrices() },
{ immediate: true },
)
</script>
<style scoped>

View file

@ -0,0 +1,102 @@
/**
* useCurrency live exchange rate conversion from USD to a target display currency.
*
* Rates are fetched lazily on first use from open.er-api.com (free, no key required).
* A module-level cache with a 1-hour TTL prevents redundant network calls.
* On fetch failure the composable falls back silently to USD display.
*/
const ER_API_URL = 'https://open.er-api.com/v6/latest/USD'
const CACHE_TTL_MS = 60 * 60 * 1000 // 1 hour
interface RateCache {
rates: Record<string, number>
fetchedAt: number
}
// Module-level cache shared across all composable instances
let _cache: RateCache | null = null
let _inflight: Promise<Record<string, number>> | null = null
async function _fetchRates(): Promise<Record<string, number>> {
const now = Date.now()
if (_cache && now - _cache.fetchedAt < CACHE_TTL_MS) {
return _cache.rates
}
// Deduplicate concurrent calls — reuse the same in-flight fetch
if (_inflight) {
return _inflight
}
_inflight = (async () => {
try {
const res = await fetch(ER_API_URL)
if (!res.ok) throw new Error(`ER-API responded ${res.status}`)
const data = await res.json()
const rates: Record<string, number> = data.rates ?? {}
_cache = { rates, fetchedAt: Date.now() }
return rates
} catch {
// Return cached stale data if available, otherwise empty object (USD passthrough)
return _cache?.rates ?? {}
} finally {
_inflight = null
}
})()
return _inflight
}
/**
* Convert an amount in USD to the target currency using the latest exchange rates.
* Returns the original amount unchanged if rates are unavailable or the currency is USD.
*/
export async function convertFromUSD(amountUSD: number, targetCurrency: string): Promise<number> {
if (targetCurrency === 'USD') return amountUSD
const rates = await _fetchRates()
const rate = rates[targetCurrency]
if (!rate) return amountUSD
return amountUSD * rate
}
/**
* Format a USD amount as a localized string in the target currency.
* Fetches exchange rates lazily. Falls back to USD display if rates are unavailable.
*
* Returns a plain USD string synchronously on first call while rates load;
* callers should use a ref that updates once the promise resolves.
*/
export async function formatPrice(amountUSD: number, currency: string): Promise<string> {
const converted = await convertFromUSD(amountUSD, currency)
try {
return new Intl.NumberFormat('en-US', {
style: 'currency',
currency,
minimumFractionDigits: 0,
maximumFractionDigits: 2,
}).format(converted)
} catch {
// Fallback if Intl doesn't know the currency code
return `${currency} ${converted.toLocaleString('en-US', { minimumFractionDigits: 0, maximumFractionDigits: 2 })}`
}
}
/**
* Synchronous USD-only formatter for use before rates have loaded.
*/
export function formatPriceUSD(amountUSD: number): string {
return new Intl.NumberFormat('en-US', {
style: 'currency',
currency: 'USD',
minimumFractionDigits: 0,
maximumFractionDigits: 2,
}).format(amountUSD)
}
// Exported for testing — allows resetting module-level cache between test cases
export function _resetCacheForTest(): void {
_cache = null
_inflight = null
}

View file

@ -12,8 +12,14 @@ export interface UserPreferences {
community?: {
blocklist_share?: boolean
}
display?: {
currency?: string
}
}
const CURRENCY_LS_KEY = 'snipe:currency'
const DEFAULT_CURRENCY = 'USD'
const apiBase = (import.meta.env.VITE_API_BASE as string) ?? ''
export const usePreferencesStore = defineStore('preferences', () => {
@ -26,14 +32,34 @@ export const usePreferencesStore = defineStore('preferences', () => {
const affiliateByokId = computed(() => prefs.value.affiliate?.byok_ids?.ebay ?? '')
const communityBlocklistShare = computed(() => prefs.value.community?.blocklist_share ?? false)
// displayCurrency: DB preference for logged-in users, localStorage for anon users
const displayCurrency = computed((): string => {
return prefs.value.display?.currency ?? DEFAULT_CURRENCY
})
async function load() {
if (!session.isLoggedIn) return
if (!session.isLoggedIn) {
// Anonymous user: read currency from localStorage
const stored = localStorage.getItem(CURRENCY_LS_KEY)
if (stored) {
prefs.value = { ...prefs.value, display: { ...prefs.value.display, currency: stored } }
}
return
}
loading.value = true
error.value = null
try {
const res = await fetch(`${apiBase}/api/preferences`)
if (res.ok) {
prefs.value = await res.json()
const data: UserPreferences = await res.json()
// Migration: if logged in but no DB preference, fall back to localStorage value
if (!data.display?.currency) {
const lsVal = localStorage.getItem(CURRENCY_LS_KEY)
if (lsVal) {
data.display = { ...data.display, currency: lsVal }
}
}
prefs.value = data
}
} catch {
// Non-cloud deploy or network error — preferences unavailable
@ -75,6 +101,18 @@ export const usePreferencesStore = defineStore('preferences', () => {
await setPref('community.blocklist_share', value)
}
async function setDisplayCurrency(code: string) {
const upper = code.toUpperCase()
// Optimistic local update so the UI reacts immediately
prefs.value = { ...prefs.value, display: { ...prefs.value.display, currency: upper } }
if (session.isLoggedIn) {
await setPref('display.currency', upper)
} else {
// Anonymous user: persist to localStorage only
localStorage.setItem(CURRENCY_LS_KEY, upper)
}
}
return {
prefs,
loading,
@ -82,9 +120,11 @@ export const usePreferencesStore = defineStore('preferences', () => {
affiliateOptOut,
affiliateByokId,
communityBlocklistShare,
displayCurrency,
load,
setAffiliateOptOut,
setAffiliateByokId,
setCommunityBlocklistShare,
setDisplayCurrency,
}
})

View file

@ -145,6 +145,7 @@ export const useSearchStore = defineStore('search', () => {
_abort?.abort()
_abort = null
loading.value = false
closeUpdates()
}
async function search(q: string, filters: SearchFilters = {}) {
@ -158,8 +159,6 @@ export const useSearchStore = defineStore('search', () => {
error.value = null
try {
// TODO: POST /api/search with { query: q, filters }
// API does not exist yet — stub returns empty results
// VITE_API_BASE is '' in dev; '/snipe' under menagerie (baked at build time by Vite)
const apiBase = (import.meta.env.VITE_API_BASE as string) ?? ''
const params = new URLSearchParams({ q })
@ -174,51 +173,36 @@ export const useSearchStore = defineStore('search', () => {
if (filters.mustExclude?.trim()) params.set('must_exclude', filters.mustExclude.trim())
if (filters.categoryId?.trim()) params.set('category_id', filters.categoryId.trim())
if (filters.adapter && filters.adapter !== 'auto') params.set('adapter', filters.adapter)
const res = await fetch(`${apiBase}/api/search?${params}`, { signal })
// Use the async endpoint: returns 202 immediately with a session_id, then
// streams listings + trust scores via SSE as the scrape completes.
const res = await fetch(`${apiBase}/api/search/async?${params}`, { signal })
if (!res.ok) throw new Error(`Search failed: ${res.status} ${res.statusText}`)
const data = await res.json() as {
listings: Listing[]
trust_scores: Record<string, TrustScore>
sellers: Record<string, Seller>
market_price: number | null
adapter_used: 'api' | 'scraper'
affiliate_active: boolean
session_id: string | null
session_id: string
status: 'queued'
}
results.value = data.listings ?? []
trustScores.value = new Map(Object.entries(data.trust_scores ?? {}))
sellers.value = new Map(Object.entries(data.sellers ?? {}))
marketPrice.value = data.market_price ?? null
adapterUsed.value = data.adapter_used ?? null
affiliateActive.value = data.affiliate_active ?? false
saveCache({
query: q,
results: results.value,
trustScores: data.trust_scores ?? {},
sellers: data.sellers ?? {},
marketPrice: marketPrice.value,
adapterUsed: adapterUsed.value,
})
// Open SSE stream if any scores are partial and a session_id was provided
const hasPartial = Object.values(data.trust_scores ?? {}).some(ts => ts.score_is_partial)
if (data.session_id && hasPartial) {
_openUpdates(data.session_id, apiBase)
}
// HTTP 202 received — scraping is underway in the background.
// Stay in loading state until the first "listings" SSE event arrives.
// loading.value stays true; enriching tracks the SSE stream being open.
enriching.value = true
_openUpdates(data.session_id, apiBase)
} catch (e) {
if (e instanceof DOMException && e.name === 'AbortError') {
// User cancelled — clear loading but don't surface as an error
results.value = []
loading.value = false
} else {
error.value = e instanceof Error ? e.message : 'Unknown error'
results.value = []
loading.value = false
}
} finally {
loading.value = false
_abort = null
}
// Note: loading.value is NOT set to false here — it stays true until the
// first "listings" SSE event arrives (see _openUpdates handler below).
}
function closeUpdates() {
@ -229,34 +213,115 @@ export const useSearchStore = defineStore('search', () => {
enriching.value = false
}
// Internal type for typed SSE events from the async search endpoint
type _AsyncListingsEvent = {
type: 'listings'
listings: Listing[]
trust_scores: Record<string, TrustScore>
sellers: Record<string, Seller>
market_price: number | null
adapter_used: 'api' | 'scraper'
affiliate_active: boolean
session_id: string
}
type _MarketPriceEvent = {
type: 'market_price'
market_price: number | null
}
type _UpdateEvent = {
type: 'update'
platform_listing_id: string
trust_score: TrustScore
seller: Seller
market_price: number | null
}
type _LegacyUpdateEvent = {
platform_listing_id: string
trust_score: TrustScore
seller: Record<string, unknown>
market_price: number | null
}
type _SSEEvent =
| _AsyncListingsEvent
| _MarketPriceEvent
| _UpdateEvent
| _LegacyUpdateEvent
function _openUpdates(sessionId: string, apiBase: string) {
closeUpdates() // close any previous stream
enriching.value = true
// Close any pre-existing stream but preserve enriching state — caller sets it.
if (_sse) {
_sse.close()
_sse = null
}
const es = new EventSource(`${apiBase}/api/updates/${sessionId}`)
_sse = es
es.onmessage = (e) => {
try {
const update = JSON.parse(e.data) as {
platform_listing_id: string
trust_score: TrustScore
seller: Record<string, unknown>
market_price: number | null
}
if (update.platform_listing_id && update.trust_score) {
trustScores.value = new Map(trustScores.value)
trustScores.value.set(update.platform_listing_id, update.trust_score)
}
if (update.seller) {
const s = update.seller as Seller
if (s.platform_seller_id) {
sellers.value = new Map(sellers.value)
sellers.value.set(s.platform_seller_id, s)
const update = JSON.parse(e.data) as _SSEEvent
if ('type' in update) {
// Typed events from the async search endpoint
if (update.type === 'listings') {
// First batch: hydrate store and transition out of loading state
results.value = update.listings ?? []
trustScores.value = new Map(Object.entries(update.trust_scores ?? {}))
sellers.value = new Map(Object.entries(update.sellers ?? {}))
marketPrice.value = update.market_price ?? null
adapterUsed.value = update.adapter_used ?? null
affiliateActive.value = update.affiliate_active ?? false
saveCache({
query: query.value,
results: results.value,
trustScores: update.trust_scores ?? {},
sellers: update.sellers ?? {},
marketPrice: marketPrice.value,
adapterUsed: adapterUsed.value,
})
// Scrape complete — turn off the initial loading spinner.
// enriching stays true while enrichment SSE is still open.
loading.value = false
} else if (update.type === 'market_price') {
if (update.market_price != null) {
marketPrice.value = update.market_price
}
} else if (update.type === 'update') {
// Per-seller enrichment update (same as legacy format but typed)
if (update.platform_listing_id && update.trust_score) {
trustScores.value = new Map(trustScores.value)
trustScores.value.set(update.platform_listing_id, update.trust_score)
}
if (update.seller?.platform_seller_id) {
sellers.value = new Map(sellers.value)
sellers.value.set(update.seller.platform_seller_id, update.seller)
}
if (update.market_price != null) {
marketPrice.value = update.market_price
}
}
// type: "error" — no special handling; stream will close via 'done'
} else {
// Legacy enrichment update (no type field) from synchronous search path
const legacy = update as _LegacyUpdateEvent
if (legacy.platform_listing_id && legacy.trust_score) {
trustScores.value = new Map(trustScores.value)
trustScores.value.set(legacy.platform_listing_id, legacy.trust_score)
}
if (legacy.seller) {
const s = legacy.seller as Seller
if (s.platform_seller_id) {
sellers.value = new Map(sellers.value)
sellers.value.set(s.platform_seller_id, s)
}
}
if (legacy.market_price != null) {
marketPrice.value = legacy.market_price
}
}
if (update.market_price != null) {
marketPrice.value = update.market_price
}
} catch {
// malformed event — ignore
@ -268,6 +333,8 @@ export const useSearchStore = defineStore('search', () => {
})
es.onerror = () => {
// If loading is still true (never got a "listings" event), clear it
loading.value = false
closeUpdates()
}
}

View file

@ -69,6 +69,28 @@
>{{ opt.label }}</button>
</div>
</div>
<!-- Display currency -->
<div class="settings-toggle">
<div class="settings-toggle-text">
<span class="settings-toggle-label">Display currency</span>
<span class="settings-toggle-desc">
Listing prices are converted from USD using live exchange rates.
Rates update hourly.
</span>
</div>
<select
id="display-currency"
class="settings-select"
:value="prefs.displayCurrency"
aria-label="Select display currency"
@change="prefs.setDisplayCurrency(($event.target as HTMLSelectElement).value)"
>
<option v-for="opt in currencyOptions" :key="opt.code" :value="opt.code">
{{ opt.code }} {{ opt.label }}
</option>
</select>
</div>
</section>
<!-- Affiliate Links only shown to signed-in cloud users -->
@ -166,6 +188,18 @@ const themeOptions: { value: 'system' | 'dark' | 'light'; label: string }[] = [
{ value: 'dark', label: 'Dark' },
{ value: 'light', label: 'Light' },
]
const currencyOptions: { code: string; label: string }[] = [
{ code: 'USD', label: 'US Dollar' },
{ code: 'EUR', label: 'Euro' },
{ code: 'GBP', label: 'British Pound' },
{ code: 'CAD', label: 'Canadian Dollar' },
{ code: 'AUD', label: 'Australian Dollar' },
{ code: 'JPY', label: 'Japanese Yen' },
{ code: 'CHF', label: 'Swiss Franc' },
{ code: 'MXN', label: 'Mexican Peso' },
{ code: 'BRL', label: 'Brazilian Real' },
{ code: 'INR', label: 'Indian Rupee' },
]
const session = useSessionStore()
const prefs = usePreferencesStore()
const { autoRun: llmAutoRun, setAutoRun: setLLMAutoRun } = useLLMQueryBuilder()
@ -346,6 +380,24 @@ function saveByokId() {
margin: 0;
}
.settings-select {
padding: var(--space-2) var(--space-3);
background: var(--color-surface);
border: 1px solid var(--color-border);
border-radius: var(--radius-md);
color: var(--color-text);
font-size: 0.875rem;
font-family: inherit;
cursor: pointer;
outline: none;
flex-shrink: 0;
transition: border-color 0.15s ease;
}
.settings-select:focus {
border-color: var(--app-primary);
}
.theme-btn-group {
display: flex;
gap: 0;