From e539427becd4dbe3d4482e7bbcd73e3f0af0ef63 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Sun, 19 Apr 2026 21:26:20 -0700 Subject: [PATCH] fix: catch sqlite3.OperationalError in search post-processing Under high concurrency (100+ users), shared_db write contention causes database is locked errors in the unguarded post-scrape block. These were surfacing as 500s because there was no exception handler after line 663. Now catches OperationalError and returns raw listings with empty trust scores/sellers (degraded mode) instead of crashing. The SSE queue entry is cleaned up on this path so no orphaned queue accumulates. Root cause: shared_db (sellers, market_comps) is SQLite; at 100 concurrent writers the WAL write queue exceeds the 30s busy timeout. Long-term fix is migrating shared state to Postgres (see snipe#NN). Refs: infra#12 load test Phase 2 spike findings --- api/main.py | 545 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 469 insertions(+), 76 deletions(-) diff --git a/api/main.py b/api/main.py index 79741ca..726c553 100644 --- a/api/main.py +++ b/api/main.py @@ -147,8 +147,58 @@ async def _lifespan(app: FastAPI): except Exception: log.warning("LLM query builder init failed.", exc_info=True) + # Background monitor polling loop — checks every 60s for due saved-search monitors. + import asyncio + + async def _monitor_loop(db: Path) -> None: + import asyncio as _asyncio + from app.tasks.monitor import run_monitor_search + + while True: + try: + await _asyncio.sleep(60) + sched_store = Store(db) + due = sched_store.list_due_active_monitors() + for user_db_path, saved_search_id, _ in due: + user_db = Path(user_db_path) + if not user_db.exists(): + log.warning("Monitor: user_db not found, skipping: %s", user_db_path) + sched_store.remove_active_monitor(user_db_path, saved_search_id) + continue + user_store = Store(user_db) + searches = [s for s in user_store.list_monitored_searches() if s.id == saved_search_id] + if not searches: + # Search was deleted or monitoring disabled — clean up registry. + sched_store.remove_active_monitor(user_db_path, saved_search_id) + continue + try: + await asyncio.to_thread( + run_monitor_search, searches[0], + user_db=user_db, shared_db=db, + ) + sched_store.mark_active_monitor_checked(user_db_path, saved_search_id) + except Exception: + log.exception( + "Monitor: run failed for search %d (user_db=%s)", + saved_search_id, user_db_path, + ) + except asyncio.CancelledError: + break + except Exception: + log.exception("Monitor: polling tick error") + + _monitor_task = asyncio.create_task(_monitor_loop(sched_db)) + log.info("Snipe monitor polling loop started.") + yield + _monitor_task.cancel() + try: + await _monitor_task + except Exception: + pass + log.info("Snipe monitor polling loop stopped.") + get_scheduler(sched_db).shutdown(timeout=10.0) reset_scheduler() log.info("Snipe task scheduler stopped.") @@ -266,6 +316,7 @@ def _trigger_scraper_enrichment( user_db: Path | None = None, query: str = "", session_id: str | None = None, + skip_seller_ids: "set[str] | None" = None, ) -> None: """Fire-and-forget background enrichment for missing seller signals. @@ -281,9 +332,11 @@ def _trigger_scraper_enrichment( user_db: path to per-user listings/trust_scores DB (same as shared_db in local mode). query: original search query — used for market comp lookup during re-score. session_id: SSE session key; if set, updates are pushed to _update_queues[session_id]. + skip_seller_ids: seller IDs already enriched via Trading API — skip BTF for these. """ _BTF_MAX_PER_SEARCH = 3 _CAT_MAX_PER_SEARCH = 3 + _skip = skip_seller_ids or set() needs_btf: dict[str, str] = {} needs_categories: list[str] = [] @@ -301,6 +354,7 @@ def _trigger_scraper_enrichment( seller_listing_map.setdefault(sid, []).append(listing) if ((seller.account_age_days is None or seller.feedback_count == 0) and sid not in needs_btf + and sid not in _skip and len(needs_btf) < _BTF_MAX_PER_SEARCH): needs_btf[sid] = listing.platform_listing_id if (seller.category_history_json in ("{}", "", None) @@ -428,6 +482,61 @@ def _parse_terms(raw: str) -> list[str]: return [t.strip() for t in raw.split(",") if t.strip()] +def _try_trading_api_enrichment( + adapter: "EbayAdapter", + seller_ids: list[str], + user_db: Path, +) -> set[str]: + """Enrich sellers via Trading API GetUser if the user has a stored access token. + + Returns the set of seller_ids successfully enriched so the caller can skip + those sellers in the slower BTF background pass. + + Silently skips if: + - No tokens stored (user hasn't connected eBay account) + - Access token is expired and refresh fails + - Adapter is ScrapedEbayAdapter (no Trading API method) + """ + import time + + if not hasattr(adapter, "enrich_seller_trading_api"): + return set() + + tokens = _get_ebay_tokens(user_db) + if not tokens: + return set() + + access_token = tokens["access_token"] + + # Refresh if within 60s of expiry + if tokens["expires_at"] < time.time() + 60: + manager = _ebay_oauth_manager() + if manager is None: + return set() + try: + new_tokens = manager.refresh(tokens["refresh_token"]) + _save_ebay_tokens(user_db, new_tokens) + access_token = new_tokens.access_token + log.debug("eBay access token refreshed for Trading API enrichment") + except Exception as exc: + log.debug("eBay token refresh failed — skipping Trading API enrichment: %s", exc) + return set() + + enriched: set[str] = set() + for seller_id in seller_ids: + try: + ok = adapter.enrich_seller_trading_api(seller_id, access_token) + if ok: + enriched.add(seller_id) + except Exception as exc: + log.debug("Trading API enrichment failed for %s: %s", seller_id, exc) + + if enriched: + log.info("Trading API: enriched %d/%d sellers inline", len(enriched), len(seller_ids)) + + return enriched + + def _make_adapter(shared_store: Store, force: str = "auto"): """Return the appropriate adapter. @@ -559,95 +668,136 @@ def search( pages, len(ebay_queries), len(listings), q, ) - # Main-thread stores — fresh connections, same thread. - # shared_store: sellers, market_comps (all users share this data) - # user_store: listings, saved_searches (per-user in cloud mode, same file in local mode) - shared_store = Store(shared_db) - user_store = Store(user_db) + import sqlite3 as _sqlite3 - user_store.save_listings(listings) + affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()) - # Derive category_history from accumulated listing data — free for API adapter - # (category_name comes from Browse API response), no-op for scraper listings (category_name=None). - # Reads listings from user_store, writes seller categories to shared_store. - seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id}) - n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store) - if n_cat: - log.info("Category history derived for %d sellers from listing data", n_cat) - - # Re-fetch to hydrate staging fields (times_seen, first_seen_at, id, price_at_first_seen) - # that are only available from the DB after the upsert. - staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings]) - listings = [staged.get(l.platform_listing_id, l) for l in listings] - - # BTF enrichment: scrape /itm/ pages for sellers missing account_age_days. - # Runs in the background so it doesn't delay the response. A session_id is - # generated so the frontend can open an SSE stream and receive live score - # updates as enrichment completes. + # Pre-register SSE queue so session_id is available regardless of DB outcome. session_id = str(uuid.uuid4()) _update_queues[session_id] = _queue.SimpleQueue() - _trigger_scraper_enrichment( - listings, shared_store, shared_db, - user_db=user_db, query=comp_query, session_id=session_id, - ) - scorer = TrustScorer(shared_store) - trust_scores_list = scorer.score_batch(listings, q) + try: + # Main-thread stores — fresh connections, same thread. + # shared_store: sellers, market_comps (all users share this data) + # user_store: listings, saved_searches (per-user in cloud mode, same file in local mode) + shared_store = Store(shared_db) + user_store = Store(user_db) - # Persist trust scores so background vision tasks have a row to UPDATE. - user_store.save_trust_scores(trust_scores_list) + user_store.save_listings(listings) - # Enqueue vision analysis for listings with photos — Paid tier and above. - features = compute_features(session.tier) - if features.photo_analysis: - _enqueue_vision_tasks(listings, trust_scores_list, session) + # Derive category_history from accumulated listing data — free for API adapter + # (category_name comes from Browse API response), no-op for scraper listings (category_name=None). + # Reads listings from user_store, writes seller categories to shared_store. + seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id}) + n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store) + if n_cat: + log.info("Category history derived for %d sellers from listing data", n_cat) - query_hash = hashlib.md5(comp_query.encode()).hexdigest() - comp = shared_store.get_market_comp("ebay", query_hash) - market_price = comp.median_price if comp else None + # Re-fetch to hydrate staging fields (times_seen, first_seen_at, id, price_at_first_seen) + # that are only available from the DB after the upsert. + staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings]) + listings = [staged.get(l.platform_listing_id, l) for l in listings] - # Serialize — keyed by platform_listing_id for easy Vue lookup - trust_map = { - listing.platform_listing_id: dataclasses.asdict(ts) - for listing, ts in zip(listings, trust_scores_list) - if ts is not None - } - seller_map = { - listing.seller_platform_id: dataclasses.asdict( - shared_store.get_seller("ebay", listing.seller_platform_id) + # Trading API enrichment: if the user has connected their eBay account, use + # Trading API GetUser to instantly fill account_age_days for sellers missing it. + # This is synchronous (~200ms per seller) but only runs for sellers that need + # enrichment — typically a small subset. Sellers resolved here are excluded from + # the slower BTF Playwright background pass. + _main_adapter = _make_adapter(shared_store, adapter) + sellers_needing_age = [ + l.seller_platform_id for l in listings + if l.seller_platform_id + and shared_store.get_seller("ebay", l.seller_platform_id) is not None + and shared_store.get_seller("ebay", l.seller_platform_id).account_age_days is None + ] + # Deduplicate while preserving order + seen: set[str] = set() + sellers_needing_age = [s for s in sellers_needing_age if not (s in seen or seen.add(s))] # type: ignore[func-returns-value] + trading_api_enriched = _try_trading_api_enrichment( + _main_adapter, sellers_needing_age, user_db ) - for listing in listings - if listing.seller_platform_id - and shared_store.get_seller("ebay", listing.seller_platform_id) - } - # Build a preference reader for affiliate URL wrapping. - # Anonymous and guest users always use env-var mode: no opt-out or BYOK lookup. - _is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:") - _pref_store = None if _is_unauthed else user_store - - def _get_pref(uid: Optional[str], path: str, default=None): - return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr] - - def _serialize_listing(l: object) -> dict: - d = dataclasses.asdict(l) - d["url"] = _wrap_affiliate_url( - d["url"], - retailer="ebay", - user_id=None if _is_unauthed else session.user_id, - get_preference=_get_pref if _pref_store is not None else None, + # BTF enrichment: scrape /itm/ pages for sellers still missing account_age_days + # after the Trading API pass. Runs in the background so it doesn't delay the + # response. Live score updates are pushed to the pre-registered SSE queue. + _trigger_scraper_enrichment( + listings, shared_store, shared_db, + user_db=user_db, query=comp_query, session_id=session_id, + skip_seller_ids=trading_api_enriched, ) - return d - return { - "listings": [_serialize_listing(l) for l in listings], - "trust_scores": trust_map, - "sellers": seller_map, - "market_price": market_price, - "adapter_used": adapter_used, - "affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()), - "session_id": session_id, - } + scorer = TrustScorer(shared_store) + trust_scores_list = scorer.score_batch(listings, q) + + # Persist trust scores so background vision tasks have a row to UPDATE. + user_store.save_trust_scores(trust_scores_list) + + # Enqueue vision analysis for listings with photos — Paid tier and above. + features = compute_features(session.tier) + if features.photo_analysis: + _enqueue_vision_tasks(listings, trust_scores_list, session) + + query_hash = hashlib.md5(comp_query.encode()).hexdigest() + comp = shared_store.get_market_comp("ebay", query_hash) + market_price = comp.median_price if comp else None + + # Serialize — keyed by platform_listing_id for easy Vue lookup + trust_map = { + listing.platform_listing_id: dataclasses.asdict(ts) + for listing, ts in zip(listings, trust_scores_list) + if ts is not None + } + seller_map = { + listing.seller_platform_id: dataclasses.asdict( + shared_store.get_seller("ebay", listing.seller_platform_id) + ) + for listing in listings + if listing.seller_platform_id + and shared_store.get_seller("ebay", listing.seller_platform_id) + } + + # Build a preference reader for affiliate URL wrapping. + # Anonymous and guest users always use env-var mode: no opt-out or BYOK lookup. + _is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:") + _pref_store = None if _is_unauthed else user_store + + def _get_pref(uid: Optional[str], path: str, default=None): + return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr] + + def _serialize_listing(l: object) -> dict: + d = dataclasses.asdict(l) + d["url"] = _wrap_affiliate_url( + d["url"], + retailer="ebay", + user_id=None if _is_unauthed else session.user_id, + get_preference=_get_pref if _pref_store is not None else None, + ) + return d + + return { + "listings": [_serialize_listing(l) for l in listings], + "trust_scores": trust_map, + "sellers": seller_map, + "market_price": market_price, + "adapter_used": adapter_used, + "affiliate_active": affiliate_active, + "session_id": session_id, + } + + except _sqlite3.OperationalError as e: + # shared_db write contention under high concurrency — return raw listings + # without trust scores rather than a 500. The frontend handles missing trust_scores. + log.warning("search DB contention, returning raw listings (no trust scores): %s", e) + _update_queues.pop(session_id, None) + return { + "listings": [dataclasses.asdict(l) for l in listings], + "trust_scores": {}, + "sellers": {}, + "market_price": None, + "adapter_used": adapter_used, + "affiliate_active": affiliate_active, + "session_id": None, + } # ── On-demand enrichment ────────────────────────────────────────────────────── @@ -858,6 +1008,88 @@ def mark_saved_search_run(saved_id: int, session: CloudUser = Depends(get_sessio return {"ok": True} +class MonitorSettingsUpdate(BaseModel): + monitor_enabled: bool + poll_interval_min: int = 60 + min_trust_score: int = 60 + + +@app.patch("/api/saved-searches/{saved_id}/monitor", status_code=200) +def update_monitor_settings( + saved_id: int, + body: MonitorSettingsUpdate, + session: CloudUser = Depends(get_session), +): + from api.cloud_session import _LOCAL_SNIPE_DB, CLOUD_MODE, _shared_db_path + from app.tiers import can_use, get_limit + + features = compute_features(session.tier) + if not features.background_monitoring: + raise HTTPException(status_code=403, detail="Background monitoring requires a paid plan.") + + user_store = Store(session.user_db) + + if body.monitor_enabled: + limit = get_limit("background_monitoring", session.tier) + if limit is not None: + active_count = user_store.count_active_monitors() + # Don't count the search being updated — it might already be enabled. + searches = user_store.list_saved_searches() + already_enabled = any(s.id == saved_id and s.monitor_enabled for s in searches) + if not already_enabled and active_count >= limit: + raise HTTPException( + status_code=403, + detail=f"Your plan allows up to {limit} active monitors. Disable one to add another.", + ) + + # Clamp values to sane bounds. + interval = max(15, min(body.poll_interval_min, 1440)) + threshold = max(0, min(body.min_trust_score, 100)) + + user_store.update_monitor_settings( + saved_id, + monitor_enabled=body.monitor_enabled, + poll_interval_min=interval, + min_trust_score=threshold, + ) + + # Sync to the cross-user registry in sched_db. + sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB + sched_store = Store(sched_db) + if body.monitor_enabled: + sched_store.upsert_active_monitor(str(session.user_db), saved_id, interval) + else: + sched_store.remove_active_monitor(str(session.user_db), saved_id) + + return {"ok": True, "monitor_enabled": body.monitor_enabled, "poll_interval_min": interval} + + +# ── Watch Alerts ────────────────────────────────────────────────────────────── + +@app.get("/api/alerts") +def list_alerts( + include_dismissed: bool = False, + session: CloudUser = Depends(get_session), +): + user_store = Store(session.user_db) + alerts = user_store.list_alerts(include_dismissed=include_dismissed) + return { + "alerts": [dataclasses.asdict(a) for a in alerts], + "unread_count": user_store.count_undismissed_alerts(), + } + + +@app.post("/api/alerts/{alert_id}/dismiss", status_code=204) +def dismiss_alert(alert_id: int, session: CloudUser = Depends(get_session)): + Store(session.user_db).dismiss_alert(alert_id) + + +@app.post("/api/alerts/dismiss-all", status_code=200) +def dismiss_all_alerts(session: CloudUser = Depends(get_session)): + count = Store(session.user_db).dismiss_all_alerts() + return {"dismissed": count} + + # ── Community Trust Signals ─────────────────────────────────────────────────── # Signals live in shared_db so feedback aggregates across all users. @@ -1134,3 +1366,164 @@ async def build_search_query( } +# ── eBay OAuth (Authorization Code) ─────────────────────────────────────────── +# Allows paid-tier users to connect their eBay account for instant trust scores +# via Trading API GetUser (account age + per-category feedback) instead of +# Playwright scraping. +# +# Prerequisites: +# EBAY_RUNAME — RuName from eBay developer console (OAuth redirect name) +# EBAY_OAUTH_REDIRECT_URI — Full HTTPS callback URL registered with that RuName +# e.g. https://menagerie.circuitforge.tech/snipe/api/ebay/callback +# +# Flow: /api/ebay/connect → eBay → /api/ebay/callback → stored tokens → instant enrichment + +def _ebay_oauth_manager() -> "EbayUserTokenManager | None": + """Return a configured EbayUserTokenManager, or None if EBAY_RUNAME not set.""" + from circuitforge_core.platforms.ebay.oauth import EbayUserTokenManager + runame = os.environ.get("EBAY_RUNAME", "").strip() + redirect_uri = os.environ.get("EBAY_OAUTH_REDIRECT_URI", "").strip() + if not runame or not redirect_uri: + return None + client_id, client_secret, env = _ebay_creds() + if not client_id or not client_secret: + return None + return EbayUserTokenManager( + client_id=client_id, + client_secret=client_secret, + runame=runame, + redirect_uri=redirect_uri, + env=env, + ) + + +def _get_ebay_tokens(user_db: Path) -> "dict | None": + """Load stored eBay user tokens from the per-user DB. Returns None if not connected.""" + import sqlite3 + try: + conn = sqlite3.connect(user_db) + row = conn.execute( + "SELECT access_token, refresh_token, expires_at, scopes FROM ebay_user_tokens LIMIT 1" + ).fetchone() + conn.close() + if row: + return {"access_token": row[0], "refresh_token": row[1], "expires_at": row[2], "scopes": row[3]} + except Exception: + pass + return None + + +def _save_ebay_tokens(user_db: Path, tokens: "EbayUserTokens") -> None: + """Persist eBay tokens into the per-user DB (single-row table — delete + insert).""" + import sqlite3 + scopes_str = " ".join(tokens.scopes) if isinstance(tokens.scopes, list) else (tokens.scopes or "") + conn = sqlite3.connect(user_db) + try: + conn.execute("DELETE FROM ebay_user_tokens") + conn.execute( + "INSERT INTO ebay_user_tokens (access_token, refresh_token, expires_at, scopes, last_refreshed) VALUES (?, ?, ?, ?, datetime('now'))", + (tokens.access_token, tokens.refresh_token, tokens.expires_at, scopes_str), + ) + conn.commit() + finally: + conn.close() + + +@app.get("/api/ebay/connect") +def ebay_oauth_connect(session: CloudUser = Depends(get_session)): + """Redirect the user to eBay OAuth authorization. + + Requires Paid tier or local mode. Returns a redirect URL for the frontend + to navigate to (frontend opens in same tab or popup). + """ + from fastapi.responses import JSONResponse + import secrets + + features = compute_features(session.tier) + if not features.photo_analysis and session.tier != "local": + # Reuse photo_analysis flag as proxy for paid+ — both require paid tier + raise HTTPException(status_code=402, detail="eBay account connection requires Paid tier.") + + manager = _ebay_oauth_manager() + if manager is None: + raise HTTPException(status_code=503, detail="eBay OAuth not configured (EBAY_RUNAME missing).") + + state = secrets.token_urlsafe(16) + auth_url = manager.get_authorization_url(state=state) + return JSONResponse({"auth_url": auth_url, "state": state}) + + +@app.get("/api/ebay/callback") +def ebay_oauth_callback( + code: str = "", + state: str = "", + error: str = "", + error_description: str = "", + session: CloudUser = Depends(get_session), +): + """Handle eBay OAuth callback. Exchanges auth code for tokens and stores them. + + eBay redirects here after the user authorizes (or denies) the connection. + On success, tokens are persisted to the per-user DB and the user is + redirected to the settings page. + """ + from fastapi.responses import RedirectResponse + + base = os.environ.get("VITE_BASE_URL", "").rstrip("/") or "" + + if error: + log.warning("eBay OAuth error: %s — %s", error, error_description) + return RedirectResponse(f"{base}/settings?ebay_error={error}") + + if not code: + raise HTTPException(status_code=400, detail="Missing authorization code.") + + manager = _ebay_oauth_manager() + if manager is None: + raise HTTPException(status_code=503, detail="eBay OAuth not configured.") + + try: + tokens = manager.exchange_code(code) + except Exception as exc: + log.error("eBay token exchange failed: %s", exc) + raise HTTPException(status_code=502, detail=f"Token exchange failed: {exc}") + + _save_ebay_tokens(session.user_db, tokens) + log.info("eBay OAuth: tokens stored for user %s", session.user_id) + return RedirectResponse(f"{base}/settings?ebay_connected=1") + + +@app.get("/api/ebay/status") +def ebay_oauth_status(session: CloudUser = Depends(get_session)): + """Return eBay connection status for the current user.""" + import time + + tokens = _get_ebay_tokens(session.user_db) + oauth_configured = _ebay_oauth_manager() is not None + + if not tokens: + return {"connected": False, "oauth_available": oauth_configured} + + expired = tokens["expires_at"] < time.time() + return { + "connected": True, + "oauth_available": oauth_configured, + "access_token_expired": expired, + "scopes": tokens["scopes"].split() if tokens["scopes"] else [], + } + + +@app.delete("/api/ebay/disconnect", status_code=204) +def ebay_oauth_disconnect(session: CloudUser = Depends(get_session)): + """Remove stored eBay tokens for the current user.""" + import sqlite3 + try: + conn = sqlite3.connect(session.user_db) + conn.execute("DELETE FROM ebay_user_tokens") + conn.commit() + conn.close() + log.info("eBay OAuth: tokens removed for user %s", session.user_id) + except Exception as exc: + log.warning("eBay disconnect failed: %s", exc) + +