fix: catch sqlite3.OperationalError in search post-processing

Under high concurrency (100+ users), shared_db write contention causes
database is locked errors in the unguarded post-scrape block. These were
surfacing as 500s because there was no exception handler after line 663.

Now catches OperationalError and returns raw listings with empty trust
scores/sellers (degraded mode) instead of crashing. The SSE queue entry
is cleaned up on this path so no orphaned queue accumulates.

Root cause: shared_db (sellers, market_comps) is SQLite; at 100 concurrent
writers the WAL write queue exceeds the 30s busy timeout. Long-term fix
is migrating shared state to Postgres (see snipe#NN).

Refs: infra#12 load test Phase 2 spike findings
This commit is contained in:
pyr0ball 2026-04-19 21:26:20 -07:00
parent ed6d509a26
commit e539427bec

View file

@ -147,8 +147,58 @@ async def _lifespan(app: FastAPI):
except Exception:
log.warning("LLM query builder init failed.", exc_info=True)
# Background monitor polling loop — checks every 60s for due saved-search monitors.
import asyncio
async def _monitor_loop(db: Path) -> None:
import asyncio as _asyncio
from app.tasks.monitor import run_monitor_search
while True:
try:
await _asyncio.sleep(60)
sched_store = Store(db)
due = sched_store.list_due_active_monitors()
for user_db_path, saved_search_id, _ in due:
user_db = Path(user_db_path)
if not user_db.exists():
log.warning("Monitor: user_db not found, skipping: %s", user_db_path)
sched_store.remove_active_monitor(user_db_path, saved_search_id)
continue
user_store = Store(user_db)
searches = [s for s in user_store.list_monitored_searches() if s.id == saved_search_id]
if not searches:
# Search was deleted or monitoring disabled — clean up registry.
sched_store.remove_active_monitor(user_db_path, saved_search_id)
continue
try:
await asyncio.to_thread(
run_monitor_search, searches[0],
user_db=user_db, shared_db=db,
)
sched_store.mark_active_monitor_checked(user_db_path, saved_search_id)
except Exception:
log.exception(
"Monitor: run failed for search %d (user_db=%s)",
saved_search_id, user_db_path,
)
except asyncio.CancelledError:
break
except Exception:
log.exception("Monitor: polling tick error")
_monitor_task = asyncio.create_task(_monitor_loop(sched_db))
log.info("Snipe monitor polling loop started.")
yield
_monitor_task.cancel()
try:
await _monitor_task
except Exception:
pass
log.info("Snipe monitor polling loop stopped.")
get_scheduler(sched_db).shutdown(timeout=10.0)
reset_scheduler()
log.info("Snipe task scheduler stopped.")
@ -266,6 +316,7 @@ def _trigger_scraper_enrichment(
user_db: Path | None = None,
query: str = "",
session_id: str | None = None,
skip_seller_ids: "set[str] | None" = None,
) -> None:
"""Fire-and-forget background enrichment for missing seller signals.
@ -281,9 +332,11 @@ def _trigger_scraper_enrichment(
user_db: path to per-user listings/trust_scores DB (same as shared_db in local mode).
query: original search query used for market comp lookup during re-score.
session_id: SSE session key; if set, updates are pushed to _update_queues[session_id].
skip_seller_ids: seller IDs already enriched via Trading API skip BTF for these.
"""
_BTF_MAX_PER_SEARCH = 3
_CAT_MAX_PER_SEARCH = 3
_skip = skip_seller_ids or set()
needs_btf: dict[str, str] = {}
needs_categories: list[str] = []
@ -301,6 +354,7 @@ def _trigger_scraper_enrichment(
seller_listing_map.setdefault(sid, []).append(listing)
if ((seller.account_age_days is None or seller.feedback_count == 0)
and sid not in needs_btf
and sid not in _skip
and len(needs_btf) < _BTF_MAX_PER_SEARCH):
needs_btf[sid] = listing.platform_listing_id
if (seller.category_history_json in ("{}", "", None)
@ -428,6 +482,61 @@ def _parse_terms(raw: str) -> list[str]:
return [t.strip() for t in raw.split(",") if t.strip()]
def _try_trading_api_enrichment(
adapter: "EbayAdapter",
seller_ids: list[str],
user_db: Path,
) -> set[str]:
"""Enrich sellers via Trading API GetUser if the user has a stored access token.
Returns the set of seller_ids successfully enriched so the caller can skip
those sellers in the slower BTF background pass.
Silently skips if:
- No tokens stored (user hasn't connected eBay account)
- Access token is expired and refresh fails
- Adapter is ScrapedEbayAdapter (no Trading API method)
"""
import time
if not hasattr(adapter, "enrich_seller_trading_api"):
return set()
tokens = _get_ebay_tokens(user_db)
if not tokens:
return set()
access_token = tokens["access_token"]
# Refresh if within 60s of expiry
if tokens["expires_at"] < time.time() + 60:
manager = _ebay_oauth_manager()
if manager is None:
return set()
try:
new_tokens = manager.refresh(tokens["refresh_token"])
_save_ebay_tokens(user_db, new_tokens)
access_token = new_tokens.access_token
log.debug("eBay access token refreshed for Trading API enrichment")
except Exception as exc:
log.debug("eBay token refresh failed — skipping Trading API enrichment: %s", exc)
return set()
enriched: set[str] = set()
for seller_id in seller_ids:
try:
ok = adapter.enrich_seller_trading_api(seller_id, access_token)
if ok:
enriched.add(seller_id)
except Exception as exc:
log.debug("Trading API enrichment failed for %s: %s", seller_id, exc)
if enriched:
log.info("Trading API: enriched %d/%d sellers inline", len(enriched), len(seller_ids))
return enriched
def _make_adapter(shared_store: Store, force: str = "auto"):
"""Return the appropriate adapter.
@ -559,95 +668,136 @@ def search(
pages, len(ebay_queries), len(listings), q,
)
# Main-thread stores — fresh connections, same thread.
# shared_store: sellers, market_comps (all users share this data)
# user_store: listings, saved_searches (per-user in cloud mode, same file in local mode)
shared_store = Store(shared_db)
user_store = Store(user_db)
import sqlite3 as _sqlite3
user_store.save_listings(listings)
affiliate_active = bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip())
# Derive category_history from accumulated listing data — free for API adapter
# (category_name comes from Browse API response), no-op for scraper listings (category_name=None).
# Reads listings from user_store, writes seller categories to shared_store.
seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id})
n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store)
if n_cat:
log.info("Category history derived for %d sellers from listing data", n_cat)
# Re-fetch to hydrate staging fields (times_seen, first_seen_at, id, price_at_first_seen)
# that are only available from the DB after the upsert.
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
listings = [staged.get(l.platform_listing_id, l) for l in listings]
# BTF enrichment: scrape /itm/ pages for sellers missing account_age_days.
# Runs in the background so it doesn't delay the response. A session_id is
# generated so the frontend can open an SSE stream and receive live score
# updates as enrichment completes.
# Pre-register SSE queue so session_id is available regardless of DB outcome.
session_id = str(uuid.uuid4())
_update_queues[session_id] = _queue.SimpleQueue()
_trigger_scraper_enrichment(
listings, shared_store, shared_db,
user_db=user_db, query=comp_query, session_id=session_id,
)
scorer = TrustScorer(shared_store)
trust_scores_list = scorer.score_batch(listings, q)
try:
# Main-thread stores — fresh connections, same thread.
# shared_store: sellers, market_comps (all users share this data)
# user_store: listings, saved_searches (per-user in cloud mode, same file in local mode)
shared_store = Store(shared_db)
user_store = Store(user_db)
# Persist trust scores so background vision tasks have a row to UPDATE.
user_store.save_trust_scores(trust_scores_list)
user_store.save_listings(listings)
# Enqueue vision analysis for listings with photos — Paid tier and above.
features = compute_features(session.tier)
if features.photo_analysis:
_enqueue_vision_tasks(listings, trust_scores_list, session)
# Derive category_history from accumulated listing data — free for API adapter
# (category_name comes from Browse API response), no-op for scraper listings (category_name=None).
# Reads listings from user_store, writes seller categories to shared_store.
seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id})
n_cat = shared_store.refresh_seller_categories("ebay", seller_ids, listing_store=user_store)
if n_cat:
log.info("Category history derived for %d sellers from listing data", n_cat)
query_hash = hashlib.md5(comp_query.encode()).hexdigest()
comp = shared_store.get_market_comp("ebay", query_hash)
market_price = comp.median_price if comp else None
# Re-fetch to hydrate staging fields (times_seen, first_seen_at, id, price_at_first_seen)
# that are only available from the DB after the upsert.
staged = user_store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
listings = [staged.get(l.platform_listing_id, l) for l in listings]
# Serialize — keyed by platform_listing_id for easy Vue lookup
trust_map = {
listing.platform_listing_id: dataclasses.asdict(ts)
for listing, ts in zip(listings, trust_scores_list)
if ts is not None
}
seller_map = {
listing.seller_platform_id: dataclasses.asdict(
shared_store.get_seller("ebay", listing.seller_platform_id)
# Trading API enrichment: if the user has connected their eBay account, use
# Trading API GetUser to instantly fill account_age_days for sellers missing it.
# This is synchronous (~200ms per seller) but only runs for sellers that need
# enrichment — typically a small subset. Sellers resolved here are excluded from
# the slower BTF Playwright background pass.
_main_adapter = _make_adapter(shared_store, adapter)
sellers_needing_age = [
l.seller_platform_id for l in listings
if l.seller_platform_id
and shared_store.get_seller("ebay", l.seller_platform_id) is not None
and shared_store.get_seller("ebay", l.seller_platform_id).account_age_days is None
]
# Deduplicate while preserving order
seen: set[str] = set()
sellers_needing_age = [s for s in sellers_needing_age if not (s in seen or seen.add(s))] # type: ignore[func-returns-value]
trading_api_enriched = _try_trading_api_enrichment(
_main_adapter, sellers_needing_age, user_db
)
for listing in listings
if listing.seller_platform_id
and shared_store.get_seller("ebay", listing.seller_platform_id)
}
# Build a preference reader for affiliate URL wrapping.
# Anonymous and guest users always use env-var mode: no opt-out or BYOK lookup.
_is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:")
_pref_store = None if _is_unauthed else user_store
def _get_pref(uid: Optional[str], path: str, default=None):
return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr]
def _serialize_listing(l: object) -> dict:
d = dataclasses.asdict(l)
d["url"] = _wrap_affiliate_url(
d["url"],
retailer="ebay",
user_id=None if _is_unauthed else session.user_id,
get_preference=_get_pref if _pref_store is not None else None,
# BTF enrichment: scrape /itm/ pages for sellers still missing account_age_days
# after the Trading API pass. Runs in the background so it doesn't delay the
# response. Live score updates are pushed to the pre-registered SSE queue.
_trigger_scraper_enrichment(
listings, shared_store, shared_db,
user_db=user_db, query=comp_query, session_id=session_id,
skip_seller_ids=trading_api_enriched,
)
return d
return {
"listings": [_serialize_listing(l) for l in listings],
"trust_scores": trust_map,
"sellers": seller_map,
"market_price": market_price,
"adapter_used": adapter_used,
"affiliate_active": bool(os.environ.get("EBAY_AFFILIATE_CAMPAIGN_ID", "").strip()),
"session_id": session_id,
}
scorer = TrustScorer(shared_store)
trust_scores_list = scorer.score_batch(listings, q)
# Persist trust scores so background vision tasks have a row to UPDATE.
user_store.save_trust_scores(trust_scores_list)
# Enqueue vision analysis for listings with photos — Paid tier and above.
features = compute_features(session.tier)
if features.photo_analysis:
_enqueue_vision_tasks(listings, trust_scores_list, session)
query_hash = hashlib.md5(comp_query.encode()).hexdigest()
comp = shared_store.get_market_comp("ebay", query_hash)
market_price = comp.median_price if comp else None
# Serialize — keyed by platform_listing_id for easy Vue lookup
trust_map = {
listing.platform_listing_id: dataclasses.asdict(ts)
for listing, ts in zip(listings, trust_scores_list)
if ts is not None
}
seller_map = {
listing.seller_platform_id: dataclasses.asdict(
shared_store.get_seller("ebay", listing.seller_platform_id)
)
for listing in listings
if listing.seller_platform_id
and shared_store.get_seller("ebay", listing.seller_platform_id)
}
# Build a preference reader for affiliate URL wrapping.
# Anonymous and guest users always use env-var mode: no opt-out or BYOK lookup.
_is_unauthed = session.user_id == "anonymous" or session.user_id.startswith("guest:")
_pref_store = None if _is_unauthed else user_store
def _get_pref(uid: Optional[str], path: str, default=None):
return _pref_store.get_user_preference(path, default=default) # type: ignore[union-attr]
def _serialize_listing(l: object) -> dict:
d = dataclasses.asdict(l)
d["url"] = _wrap_affiliate_url(
d["url"],
retailer="ebay",
user_id=None if _is_unauthed else session.user_id,
get_preference=_get_pref if _pref_store is not None else None,
)
return d
return {
"listings": [_serialize_listing(l) for l in listings],
"trust_scores": trust_map,
"sellers": seller_map,
"market_price": market_price,
"adapter_used": adapter_used,
"affiliate_active": affiliate_active,
"session_id": session_id,
}
except _sqlite3.OperationalError as e:
# shared_db write contention under high concurrency — return raw listings
# without trust scores rather than a 500. The frontend handles missing trust_scores.
log.warning("search DB contention, returning raw listings (no trust scores): %s", e)
_update_queues.pop(session_id, None)
return {
"listings": [dataclasses.asdict(l) for l in listings],
"trust_scores": {},
"sellers": {},
"market_price": None,
"adapter_used": adapter_used,
"affiliate_active": affiliate_active,
"session_id": None,
}
# ── On-demand enrichment ──────────────────────────────────────────────────────
@ -858,6 +1008,88 @@ def mark_saved_search_run(saved_id: int, session: CloudUser = Depends(get_sessio
return {"ok": True}
class MonitorSettingsUpdate(BaseModel):
monitor_enabled: bool
poll_interval_min: int = 60
min_trust_score: int = 60
@app.patch("/api/saved-searches/{saved_id}/monitor", status_code=200)
def update_monitor_settings(
saved_id: int,
body: MonitorSettingsUpdate,
session: CloudUser = Depends(get_session),
):
from api.cloud_session import _LOCAL_SNIPE_DB, CLOUD_MODE, _shared_db_path
from app.tiers import can_use, get_limit
features = compute_features(session.tier)
if not features.background_monitoring:
raise HTTPException(status_code=403, detail="Background monitoring requires a paid plan.")
user_store = Store(session.user_db)
if body.monitor_enabled:
limit = get_limit("background_monitoring", session.tier)
if limit is not None:
active_count = user_store.count_active_monitors()
# Don't count the search being updated — it might already be enabled.
searches = user_store.list_saved_searches()
already_enabled = any(s.id == saved_id and s.monitor_enabled for s in searches)
if not already_enabled and active_count >= limit:
raise HTTPException(
status_code=403,
detail=f"Your plan allows up to {limit} active monitors. Disable one to add another.",
)
# Clamp values to sane bounds.
interval = max(15, min(body.poll_interval_min, 1440))
threshold = max(0, min(body.min_trust_score, 100))
user_store.update_monitor_settings(
saved_id,
monitor_enabled=body.monitor_enabled,
poll_interval_min=interval,
min_trust_score=threshold,
)
# Sync to the cross-user registry in sched_db.
sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB
sched_store = Store(sched_db)
if body.monitor_enabled:
sched_store.upsert_active_monitor(str(session.user_db), saved_id, interval)
else:
sched_store.remove_active_monitor(str(session.user_db), saved_id)
return {"ok": True, "monitor_enabled": body.monitor_enabled, "poll_interval_min": interval}
# ── Watch Alerts ──────────────────────────────────────────────────────────────
@app.get("/api/alerts")
def list_alerts(
include_dismissed: bool = False,
session: CloudUser = Depends(get_session),
):
user_store = Store(session.user_db)
alerts = user_store.list_alerts(include_dismissed=include_dismissed)
return {
"alerts": [dataclasses.asdict(a) for a in alerts],
"unread_count": user_store.count_undismissed_alerts(),
}
@app.post("/api/alerts/{alert_id}/dismiss", status_code=204)
def dismiss_alert(alert_id: int, session: CloudUser = Depends(get_session)):
Store(session.user_db).dismiss_alert(alert_id)
@app.post("/api/alerts/dismiss-all", status_code=200)
def dismiss_all_alerts(session: CloudUser = Depends(get_session)):
count = Store(session.user_db).dismiss_all_alerts()
return {"dismissed": count}
# ── Community Trust Signals ───────────────────────────────────────────────────
# Signals live in shared_db so feedback aggregates across all users.
@ -1134,3 +1366,164 @@ async def build_search_query(
}
# ── eBay OAuth (Authorization Code) ───────────────────────────────────────────
# Allows paid-tier users to connect their eBay account for instant trust scores
# via Trading API GetUser (account age + per-category feedback) instead of
# Playwright scraping.
#
# Prerequisites:
# EBAY_RUNAME — RuName from eBay developer console (OAuth redirect name)
# EBAY_OAUTH_REDIRECT_URI — Full HTTPS callback URL registered with that RuName
# e.g. https://menagerie.circuitforge.tech/snipe/api/ebay/callback
#
# Flow: /api/ebay/connect → eBay → /api/ebay/callback → stored tokens → instant enrichment
def _ebay_oauth_manager() -> "EbayUserTokenManager | None":
"""Return a configured EbayUserTokenManager, or None if EBAY_RUNAME not set."""
from circuitforge_core.platforms.ebay.oauth import EbayUserTokenManager
runame = os.environ.get("EBAY_RUNAME", "").strip()
redirect_uri = os.environ.get("EBAY_OAUTH_REDIRECT_URI", "").strip()
if not runame or not redirect_uri:
return None
client_id, client_secret, env = _ebay_creds()
if not client_id or not client_secret:
return None
return EbayUserTokenManager(
client_id=client_id,
client_secret=client_secret,
runame=runame,
redirect_uri=redirect_uri,
env=env,
)
def _get_ebay_tokens(user_db: Path) -> "dict | None":
"""Load stored eBay user tokens from the per-user DB. Returns None if not connected."""
import sqlite3
try:
conn = sqlite3.connect(user_db)
row = conn.execute(
"SELECT access_token, refresh_token, expires_at, scopes FROM ebay_user_tokens LIMIT 1"
).fetchone()
conn.close()
if row:
return {"access_token": row[0], "refresh_token": row[1], "expires_at": row[2], "scopes": row[3]}
except Exception:
pass
return None
def _save_ebay_tokens(user_db: Path, tokens: "EbayUserTokens") -> None:
"""Persist eBay tokens into the per-user DB (single-row table — delete + insert)."""
import sqlite3
scopes_str = " ".join(tokens.scopes) if isinstance(tokens.scopes, list) else (tokens.scopes or "")
conn = sqlite3.connect(user_db)
try:
conn.execute("DELETE FROM ebay_user_tokens")
conn.execute(
"INSERT INTO ebay_user_tokens (access_token, refresh_token, expires_at, scopes, last_refreshed) VALUES (?, ?, ?, ?, datetime('now'))",
(tokens.access_token, tokens.refresh_token, tokens.expires_at, scopes_str),
)
conn.commit()
finally:
conn.close()
@app.get("/api/ebay/connect")
def ebay_oauth_connect(session: CloudUser = Depends(get_session)):
"""Redirect the user to eBay OAuth authorization.
Requires Paid tier or local mode. Returns a redirect URL for the frontend
to navigate to (frontend opens in same tab or popup).
"""
from fastapi.responses import JSONResponse
import secrets
features = compute_features(session.tier)
if not features.photo_analysis and session.tier != "local":
# Reuse photo_analysis flag as proxy for paid+ — both require paid tier
raise HTTPException(status_code=402, detail="eBay account connection requires Paid tier.")
manager = _ebay_oauth_manager()
if manager is None:
raise HTTPException(status_code=503, detail="eBay OAuth not configured (EBAY_RUNAME missing).")
state = secrets.token_urlsafe(16)
auth_url = manager.get_authorization_url(state=state)
return JSONResponse({"auth_url": auth_url, "state": state})
@app.get("/api/ebay/callback")
def ebay_oauth_callback(
code: str = "",
state: str = "",
error: str = "",
error_description: str = "",
session: CloudUser = Depends(get_session),
):
"""Handle eBay OAuth callback. Exchanges auth code for tokens and stores them.
eBay redirects here after the user authorizes (or denies) the connection.
On success, tokens are persisted to the per-user DB and the user is
redirected to the settings page.
"""
from fastapi.responses import RedirectResponse
base = os.environ.get("VITE_BASE_URL", "").rstrip("/") or ""
if error:
log.warning("eBay OAuth error: %s%s", error, error_description)
return RedirectResponse(f"{base}/settings?ebay_error={error}")
if not code:
raise HTTPException(status_code=400, detail="Missing authorization code.")
manager = _ebay_oauth_manager()
if manager is None:
raise HTTPException(status_code=503, detail="eBay OAuth not configured.")
try:
tokens = manager.exchange_code(code)
except Exception as exc:
log.error("eBay token exchange failed: %s", exc)
raise HTTPException(status_code=502, detail=f"Token exchange failed: {exc}")
_save_ebay_tokens(session.user_db, tokens)
log.info("eBay OAuth: tokens stored for user %s", session.user_id)
return RedirectResponse(f"{base}/settings?ebay_connected=1")
@app.get("/api/ebay/status")
def ebay_oauth_status(session: CloudUser = Depends(get_session)):
"""Return eBay connection status for the current user."""
import time
tokens = _get_ebay_tokens(session.user_db)
oauth_configured = _ebay_oauth_manager() is not None
if not tokens:
return {"connected": False, "oauth_available": oauth_configured}
expired = tokens["expires_at"] < time.time()
return {
"connected": True,
"oauth_available": oauth_configured,
"access_token_expired": expired,
"scopes": tokens["scopes"].split() if tokens["scopes"] else [],
}
@app.delete("/api/ebay/disconnect", status_code=204)
def ebay_oauth_disconnect(session: CloudUser = Depends(get_session)):
"""Remove stored eBay tokens for the current user."""
import sqlite3
try:
conn = sqlite3.connect(session.user_db)
conn.execute("DELETE FROM ebay_user_tokens")
conn.commit()
conn.close()
log.info("eBay OAuth: tokens removed for user %s", session.user_id)
except Exception as exc:
log.warning("eBay disconnect failed: %s", exc)