snipe/api/main.py
pyr0ball 98695b00f0 feat(snipe): eBay trust scoring MVP — search, filters, enrichment, comps
Core trust scoring:
- Five metadata signals (account age, feedback count/ratio, price vs market,
  category history), composited 0–100
- CV-based price signal suppression for heterogeneous search results
  (e.g. mixed laptop generations won't false-positive suspicious_price)
- Expanded scratch/dent title detection: evasive redirects, functional problem
  phrases, DIY/repair indicators
- Hard filters: new_account, established_bad_actor
- Soft flags: low_feedback, suspicious_price, duplicate_photo, scratch_dent,
  long_on_market, significant_price_drop

Search & filtering:
- Browse API adapter (up to 200 items/page) + Playwright scraper fallback
- OR-group query expansion for comprehensive variant coverage
- Must-include (AND/ANY/groups), must-exclude, category, price range filters
- Saved searches with full filter round-trip via URL params

Seller enrichment:
- Background BTF /itm/ scraping for account age (Kasada-safe headed Chromium)
- On-demand enrichment: POST /api/enrich + ListingCard ↻ button
- Category history derived from Browse API categories field (free, no extra calls)
- Shopping API GetUserProfile inline enrichment for API adapter

Market comps:
- eBay Marketplace Insights API with Browse API fallback (catches 403 + 404)
- Comps prioritised in ThreadPoolExecutor (submitted first)

Infrastructure:
- Staging DB fields: times_seen, first_seen_at, price_at_first_seen, category_name
- Migrations 004 (staging tracking) + 005 (listing category)
- eBay webhook handler stub
- Cloud compose stack (compose.cloud.yml)
- Vue frontend: search store, saved searches store, ListingCard, filter sidebar

Docs:
- README fully rewritten to reflect MVP status + full feature documentation
- Roadmap table linked to all 13 Forgejo issues
2026-03-26 23:37:09 -07:00

395 lines
16 KiB
Python

"""Snipe FastAPI — search endpoint wired to ScrapedEbayAdapter + TrustScorer."""
from __future__ import annotations
import dataclasses
import hashlib
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from circuitforge_core.config import load_env
from app.db.store import Store
from app.db.models import SavedSearch as SavedSearchModel
from app.platforms import SearchFilters
from app.platforms.ebay.scraper import ScrapedEbayAdapter
from app.platforms.ebay.adapter import EbayAdapter
from app.platforms.ebay.auth import EbayTokenManager
from app.platforms.ebay.query_builder import expand_queries, parse_groups
from app.trust import TrustScorer
from api.ebay_webhook import router as ebay_webhook_router
load_env(Path(".env"))
log = logging.getLogger(__name__)
_DB_PATH = Path(os.environ.get("SNIPE_DB", "data/snipe.db"))
_DB_PATH.parent.mkdir(exist_ok=True)
def _ebay_creds() -> tuple[str, str, str]:
"""Return (client_id, client_secret, env) from env vars.
New names: EBAY_APP_ID / EBAY_CERT_ID (sandbox: EBAY_SANDBOX_APP_ID / EBAY_SANDBOX_CERT_ID)
Legacy fallback: EBAY_CLIENT_ID / EBAY_CLIENT_SECRET
"""
env = os.environ.get("EBAY_ENV", "production").strip()
if env == "sandbox":
client_id = os.environ.get("EBAY_SANDBOX_APP_ID", "").strip()
client_secret = os.environ.get("EBAY_SANDBOX_CERT_ID", "").strip()
else:
client_id = (os.environ.get("EBAY_APP_ID") or os.environ.get("EBAY_CLIENT_ID", "")).strip()
client_secret = (os.environ.get("EBAY_CERT_ID") or os.environ.get("EBAY_CLIENT_SECRET", "")).strip()
return client_id, client_secret, env
app = FastAPI(title="Snipe API", version="0.1.0")
app.include_router(ebay_webhook_router)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/api/health")
def health():
return {"status": "ok"}
def _trigger_scraper_enrichment(listings: list, store: Store) -> None:
"""Fire-and-forget background enrichment for missing seller signals.
Two enrichment passes run concurrently in the same daemon thread:
1. BTF (/itm/ pages) — fills account_age_days for sellers where it is None.
2. _ssn search pages — fills category_history_json for sellers with no history.
The main response returns immediately; enriched data lands in the DB for
future searches. Uses ScrapedEbayAdapter's Playwright stack regardless of
which adapter was used for the main search (Shopping API handles age for
the API adapter inline; BTF is the fallback for no-creds / scraper mode).
"""
# Caps per search: limits Playwright sessions launched in the background so we
# don't hammer Kasada or spin up dozens of Xvfb instances after a large search.
# Remaining sellers get enriched incrementally on subsequent searches.
_BTF_MAX_PER_SEARCH = 3
_CAT_MAX_PER_SEARCH = 3
needs_btf: dict[str, str] = {}
needs_categories: list[str] = []
for listing in listings:
sid = listing.seller_platform_id
if not sid:
continue
seller = store.get_seller("ebay", sid)
if not seller:
continue
if (seller.account_age_days is None
and sid not in needs_btf
and len(needs_btf) < _BTF_MAX_PER_SEARCH):
needs_btf[sid] = listing.platform_listing_id
if (seller.category_history_json in ("{}", "", None)
and sid not in needs_categories
and len(needs_categories) < _CAT_MAX_PER_SEARCH):
needs_categories.append(sid)
if not needs_btf and not needs_categories:
return
log.info(
"Scraper enrichment: %d BTF age + %d category pages queued",
len(needs_btf), len(needs_categories),
)
def _run():
try:
enricher = ScrapedEbayAdapter(Store(_DB_PATH))
if needs_btf:
enricher.enrich_sellers_btf(needs_btf, max_workers=2)
log.info("BTF enrichment complete for %d sellers", len(needs_btf))
if needs_categories:
enricher.enrich_sellers_categories(needs_categories, max_workers=2)
log.info("Category enrichment complete for %d sellers", len(needs_categories))
except Exception as e:
log.warning("Scraper enrichment failed: %s", e)
import threading
t = threading.Thread(target=_run, daemon=True)
t.start()
def _parse_terms(raw: str) -> list[str]:
"""Split a comma-separated keyword string into non-empty, stripped terms."""
return [t.strip() for t in raw.split(",") if t.strip()]
def _make_adapter(store: Store, force: str = "auto"):
"""Return the appropriate adapter.
force: "auto" | "api" | "scraper"
auto — API if creds present, else scraper
api — Browse API (raises if no creds)
scraper — Playwright scraper regardless of creds
"""
client_id, client_secret, env = _ebay_creds()
has_creds = bool(client_id and client_secret)
if force == "scraper":
return ScrapedEbayAdapter(store)
if force == "api":
if not has_creds:
raise ValueError("adapter=api requested but no eBay API credentials configured")
return EbayAdapter(EbayTokenManager(client_id, client_secret, env), store, env=env)
# auto
if has_creds:
return EbayAdapter(EbayTokenManager(client_id, client_secret, env), store, env=env)
log.debug("No eBay API credentials — using scraper adapter (partial trust scores)")
return ScrapedEbayAdapter(store)
def _adapter_name(force: str = "auto") -> str:
"""Return the name of the adapter that would be used — without creating it."""
client_id, client_secret, _ = _ebay_creds()
if force == "scraper":
return "scraper"
if force == "api" or (force == "auto" and client_id and client_secret):
return "api"
return "scraper"
@app.get("/api/search")
def search(
q: str = "",
max_price: float = 0,
min_price: float = 0,
pages: int = 1,
must_include: str = "", # raw filter string; client-side always applied
must_include_mode: str = "all", # "all" | "any" | "groups" — drives eBay expansion
must_exclude: str = "", # comma-separated; forwarded to eBay -term + client-side
category_id: str = "", # eBay category ID — forwarded to Browse API / scraper _sacat
adapter: str = "auto", # "auto" | "api" | "scraper" — override adapter selection
):
if not q.strip():
return {"listings": [], "trust_scores": {}, "sellers": {}, "market_price": None, "adapter_used": _adapter_name(adapter)}
must_exclude_terms = _parse_terms(must_exclude)
# In Groups mode, expand OR groups into multiple targeted eBay queries to
# guarantee comprehensive result coverage — eBay relevance won't silently drop variants.
if must_include_mode == "groups" and must_include.strip():
or_groups = parse_groups(must_include)
ebay_queries = expand_queries(q, or_groups)
else:
ebay_queries = [q]
base_filters = SearchFilters(
max_price=max_price if max_price > 0 else None,
min_price=min_price if min_price > 0 else None,
pages=max(1, pages),
must_exclude=must_exclude_terms, # forwarded to eBay -term by the scraper
category_id=category_id.strip() or None,
)
adapter_used = _adapter_name(adapter)
# Each thread creates its own Store — sqlite3 check_same_thread=True.
def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(_DB_PATH), adapter).search(ebay_query, base_filters)
def _run_comps() -> None:
try:
_make_adapter(Store(_DB_PATH), adapter).get_completed_sales(q, pages)
except Exception:
log.warning("comps: unhandled exception for %r", q, exc_info=True)
try:
# Comps submitted first — guarantees an immediate worker slot even at max concurrency.
# Seller enrichment runs after the executor exits (background thread), so comps are
# always prioritised over tracking seller age / category history.
max_workers = min(len(ebay_queries) + 1, 5)
with ThreadPoolExecutor(max_workers=max_workers) as ex:
comps_future = ex.submit(_run_comps)
search_futures = [ex.submit(_run_search, eq) for eq in ebay_queries]
# Merge and deduplicate across all search queries
seen_ids: set[str] = set()
listings: list = []
for fut in search_futures:
for listing in fut.result():
if listing.platform_listing_id not in seen_ids:
seen_ids.add(listing.platform_listing_id)
listings.append(listing)
comps_future.result() # side-effect: market comp written to DB
except Exception as e:
log.warning("eBay scrape failed: %s", e)
raise HTTPException(status_code=502, detail=f"eBay search failed: {e}")
log.info("Multi-search: %d queries → %d unique listings", len(ebay_queries), len(listings))
# Main-thread store for all post-search reads/writes — fresh connection, same thread.
store = Store(_DB_PATH)
store.save_listings(listings)
# Derive category_history from accumulated listing data — free for API adapter
# (category_name comes from Browse API response), no-op for scraper listings (category_name=None).
seller_ids = list({l.seller_platform_id for l in listings if l.seller_platform_id})
n_cat = store.refresh_seller_categories("ebay", seller_ids)
if n_cat:
log.info("Category history derived for %d sellers from listing data", n_cat)
# Re-fetch to hydrate staging fields (times_seen, first_seen_at, id, price_at_first_seen)
# that are only available from the DB after the upsert.
staged = store.get_listings_staged("ebay", [l.platform_listing_id for l in listings])
listings = [staged.get(l.platform_listing_id, l) for l in listings]
# BTF enrichment: scrape /itm/ pages for sellers missing account_age_days.
# Runs in the background so it doesn't delay the response; next search of
# the same sellers will have full scores.
_trigger_scraper_enrichment(listings, store)
scorer = TrustScorer(store)
trust_scores_list = scorer.score_batch(listings, q)
query_hash = hashlib.md5(q.encode()).hexdigest()
comp = store.get_market_comp("ebay", query_hash)
market_price = comp.median_price if comp else None
# Serialize — keyed by platform_listing_id for easy Vue lookup
trust_map = {
listing.platform_listing_id: dataclasses.asdict(ts)
for listing, ts in zip(listings, trust_scores_list)
if ts is not None
}
seller_map = {
listing.seller_platform_id: dataclasses.asdict(
store.get_seller("ebay", listing.seller_platform_id)
)
for listing in listings
if listing.seller_platform_id
and store.get_seller("ebay", listing.seller_platform_id)
}
return {
"listings": [dataclasses.asdict(l) for l in listings],
"trust_scores": trust_map,
"sellers": seller_map,
"market_price": market_price,
"adapter_used": adapter_used,
}
# ── On-demand enrichment ──────────────────────────────────────────────────────
@app.post("/api/enrich")
def enrich_seller(seller: str, listing_id: str, query: str = ""):
"""Synchronous on-demand enrichment for a single seller + re-score.
Runs enrichment paths in parallel:
- Shopping API GetUserProfile (fast, ~500ms) — account_age_days if API creds present
- BTF /itm/ Playwright scrape (~20s) — account_age_days fallback
- _ssn Playwright scrape (~20s) — category_history_json
BTF and _ssn run concurrently; total wall time ~20s when Playwright needed.
Returns the updated trust_score and seller so the frontend can patch in-place.
"""
import threading
store = Store(_DB_PATH)
seller_obj = store.get_seller("ebay", seller)
if not seller_obj:
raise HTTPException(status_code=404, detail=f"Seller '{seller}' not found")
# Fast path: Shopping API for account age (inline, no Playwright)
try:
api_adapter = _make_adapter(store, "api")
if hasattr(api_adapter, "enrich_sellers_shopping_api"):
api_adapter.enrich_sellers_shopping_api([seller])
except Exception:
pass # no API creds — fall through to BTF
seller_obj = store.get_seller("ebay", seller)
needs_btf = seller_obj is not None and seller_obj.account_age_days is None
needs_categories = seller_obj is None or seller_obj.category_history_json in ("{}", "", None)
# Slow path: Playwright for remaining gaps (BTF + _ssn in parallel threads)
if needs_btf or needs_categories:
scraper = ScrapedEbayAdapter(Store(_DB_PATH))
errors: list[Exception] = []
def _btf():
try:
scraper.enrich_sellers_btf({seller: listing_id}, max_workers=1)
except Exception as e:
errors.append(e)
def _ssn():
try:
ScrapedEbayAdapter(Store(_DB_PATH)).enrich_sellers_categories([seller], max_workers=1)
except Exception as e:
errors.append(e)
threads = []
if needs_btf:
threads.append(threading.Thread(target=_btf, daemon=True))
if needs_categories:
threads.append(threading.Thread(target=_ssn, daemon=True))
for t in threads:
t.start()
for t in threads:
t.join(timeout=60)
if errors:
log.warning("enrich_seller: %d scrape error(s): %s", len(errors), errors[0])
# Re-fetch listing with staging fields, re-score
staged = store.get_listings_staged("ebay", [listing_id])
listing = staged.get(listing_id)
if not listing:
raise HTTPException(status_code=404, detail=f"Listing '{listing_id}' not found")
scorer = TrustScorer(store)
trust_list = scorer.score_batch([listing], query or listing.title)
trust = trust_list[0] if trust_list else None
seller_final = store.get_seller("ebay", seller)
return {
"trust_score": dataclasses.asdict(trust) if trust else None,
"seller": dataclasses.asdict(seller_final) if seller_final else None,
}
# ── Saved Searches ────────────────────────────────────────────────────────────
class SavedSearchCreate(BaseModel):
name: str
query: str
filters_json: str = "{}"
@app.get("/api/saved-searches")
def list_saved_searches():
return {"saved_searches": [dataclasses.asdict(s) for s in Store(_DB_PATH).list_saved_searches()]}
@app.post("/api/saved-searches", status_code=201)
def create_saved_search(body: SavedSearchCreate):
created = Store(_DB_PATH).save_saved_search(
SavedSearchModel(name=body.name, query=body.query, platform="ebay", filters_json=body.filters_json)
)
return dataclasses.asdict(created)
@app.delete("/api/saved-searches/{saved_id}", status_code=204)
def delete_saved_search(saved_id: int):
Store(_DB_PATH).delete_saved_search(saved_id)
@app.patch("/api/saved-searches/{saved_id}/run")
def mark_saved_search_run(saved_id: int):
Store(_DB_PATH).update_saved_search_last_run(saved_id)
return {"ok": True}