feat: migrate shared_db (sellers/market_comps/blocklist) from SQLite to Postgres (#45)

Three-layer migration: SQLite Store remains for per-user tables (listings,
trust_scores, background_tasks, community_signals). Postgres takes over
for all high-contention shared tables.

Closes: #45
This commit is contained in:
pyr0ball 2026-05-22 15:49:02 -07:00
commit 341d66d5f0
16 changed files with 804 additions and 34 deletions

View file

@ -110,6 +110,13 @@ CF_APP_NAME=snipe
# Defaults to GPU_SERVER_URL if unset. # Defaults to GPU_SERVER_URL if unset.
# CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700 # CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700
# ── Shared Postgres (optional — strongly recommended for cloud/multi-user) ────
# When set, sellers, market_comps, reported_sellers, and scammer_blocklist are
# stored in Postgres instead of SQLite. Required to avoid database-locked errors
# under concurrent load (>10 simultaneous search users).
# Cloud instances: set to the cf-postgres DSN. Self-hosted: leave unset for SQLite.
# SNIPE_SHARED_DB_URL=postgresql://snipe:<password>@localhost:5432/snipe_shared
# ── Community DB (optional) ────────────────────────────────────────────────── # ── Community DB (optional) ──────────────────────────────────────────────────
# When set, seller trust signals (confirmed scammers added to blocklist) are # When set, seller trust signals (confirmed scammers added to blocklist) are
# published to the shared community PostgreSQL for cross-user signal aggregation. # published to the shared community PostgreSQL for cross-user signal aggregation.

View file

@ -5,6 +5,7 @@ WORKDIR /app
# System deps for Playwright/Chromium # System deps for Playwright/Chromium
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
xvfb \ xvfb \
libpq-dev \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install circuitforge-core from sibling directory (compose sets context: ..) # Install circuitforge-core from sibling directory (compose sets context: ..)

View file

@ -33,6 +33,7 @@ from api.cloud_session import CloudUser, compute_features, get_session
from api.ebay_webhook import router as ebay_webhook_router from api.ebay_webhook import router as ebay_webhook_router
from app.db.models import SavedSearch as SavedSearchModel from app.db.models import SavedSearch as SavedSearchModel
from app.db.models import ScammerEntry from app.db.models import ScammerEntry
from app.db.protocol import SharedTableProtocol
from app.db.store import Store from app.db.store import Store
from app.platforms import SUPPORTED_PLATFORMS, SearchFilters from app.platforms import SUPPORTED_PLATFORMS, SearchFilters
from app.platforms.ebay.adapter import EbayAdapter from app.platforms.ebay.adapter import EbayAdapter
@ -142,6 +143,19 @@ def _get_community_store() -> "SnipeCommunityStore | None":
return _community_store return _community_store
# ── Shared Postgres backend (optional — active when SNIPE_SHARED_DB_URL is set) ─
# Replaces the SQLite shared.db for sellers, market_comps, reported_sellers, and
# scammer_blocklist. ThreadedConnectionPool is thread-safe; one instance per process.
_pg_shared_store: "SharedTableProtocol | None" = None
def _make_shared_store(path: Path) -> SharedTableProtocol:
"""Return the active shared backend — Postgres if configured, SQLite otherwise."""
if _pg_shared_store is not None:
return _pg_shared_store
return Store(path)
# ── LLM Query Builder singletons (optional — requires LLM backend) ──────────── # ── LLM Query Builder singletons (optional — requires LLM backend) ────────────
_category_cache = None _category_cache = None
_query_translator = None _query_translator = None
@ -153,7 +167,7 @@ def _get_query_translator():
@asynccontextmanager @asynccontextmanager
async def _lifespan(app: FastAPI): async def _lifespan(app: FastAPI):
global _community_store global _community_store, _pg_shared_store
# Pre-warm the Chromium browser pool so the first scrape request does not # Pre-warm the Chromium browser pool so the first scrape request does not
# pay the full cold-start cost (5-10s Xvfb + browser launch). # pay the full cold-start cost (5-10s Xvfb + browser launch).
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2). # Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
@ -178,6 +192,21 @@ async def _lifespan(app: FastAPI):
get_scheduler(sched_db) get_scheduler(sched_db)
log.info("Snipe task scheduler started (db=%s)", sched_db) log.info("Snipe task scheduler started (db=%s)", sched_db)
# Shared Postgres backend — optional. Replaces SQLite for sellers, market_comps,
# reported_sellers, and scammer_blocklist under concurrent load.
snipe_shared_dsn = os.environ.get("SNIPE_SHARED_DB_URL", "")
if snipe_shared_dsn:
try:
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore as _SnipeSharedStore
_pg_db = SnipeSharedDB(snipe_shared_dsn)
_pg_db.run_migrations()
_pg_shared_store = _SnipeSharedStore(_pg_db)
log.info("Shared Postgres backend ready (sellers, market_comps, blocklist)")
except Exception:
log.exception(
"SNIPE_SHARED_DB_URL set but Postgres init failed — falling back to SQLite"
)
# Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset. # Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset.
community_db_url = os.environ.get("COMMUNITY_DB_URL", "") community_db_url = os.environ.get("COMMUNITY_DB_URL", "")
if community_db_url: if community_db_url:
@ -446,7 +475,7 @@ def session_info(response: Response, session: CloudUser = Depends(get_session)):
def _trigger_scraper_enrichment( def _trigger_scraper_enrichment(
listings: list, listings: list,
shared_store: Store, shared_store: SharedTableProtocol,
shared_db: Path, shared_db: Path,
user_db: Path | None = None, user_db: Path | None = None,
query: str = "", query: str = "",
@ -512,7 +541,7 @@ def _trigger_scraper_enrichment(
if not session_id or session_id not in _update_queues: if not session_id or session_id not in _update_queues:
return return
q = _update_queues[session_id] q = _update_queues[session_id]
thread_shared = Store(shared_db) thread_shared = shared_store.clone()
thread_user = Store(user_db or shared_db) thread_user = Store(user_db or shared_db)
scorer = TrustScorer(thread_shared) scorer = TrustScorer(thread_shared)
comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest()) comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest())
@ -538,7 +567,7 @@ def _trigger_scraper_enrichment(
def _run(): def _run():
try: try:
enricher = ScrapedEbayAdapter(Store(shared_db)) enricher = ScrapedEbayAdapter(shared_store.clone())
if needs_btf: if needs_btf:
enricher.enrich_sellers_btf(needs_btf, max_workers=2) enricher.enrich_sellers_btf(needs_btf, max_workers=2)
log.info("BTF enrichment complete for %d sellers", len(needs_btf)) log.info("BTF enrichment complete for %d sellers", len(needs_btf))
@ -812,7 +841,7 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue() _update_queues[session_id] = _queue.SimpleQueue()
try: try:
shared_store = Store(shared_db) shared_store = _make_shared_store(shared_db)
user_store = Store(user_db) user_store = Store(user_db)
# Re-hydrate Listing dataclass instances from the cached dicts so the # Re-hydrate Listing dataclass instances from the cached dicts so the
@ -897,13 +926,14 @@ def search(
_evict_expired_cache() _evict_expired_cache()
log.info("cache: miss key=%s q=%r", cache_key, q) log.info("cache: miss key=%s q=%r", cache_key, q)
# Each thread creates its own Store — sqlite3 check_same_thread=True. # Each thread creates its own store via clone() — sqlite3 check_same_thread=True;
# SnipeSharedStore.clone() returns self (ThreadedConnectionPool is thread-safe).
def _run_search(ebay_query: str) -> list: def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(shared_db), adapter, platform=platform).search(ebay_query, base_filters) return _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None: def _run_comps() -> None:
try: try:
_make_adapter(Store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception: except Exception:
log.warning("comps: unhandled exception for %r", comp_query, exc_info=True) log.warning("comps: unhandled exception for %r", comp_query, exc_info=True)
@ -944,10 +974,9 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue() _update_queues[session_id] = _queue.SimpleQueue()
try: try:
# Main-thread stores — fresh connections, same thread. # Main-thread stores — shared_store may be Postgres (sellers, market_comps);
# shared_store: sellers, market_comps (all users share this data) # user_store is always per-user SQLite (listings, trust_scores, saved_searches).
# user_store: listings, saved_searches (per-user in cloud mode, same file in local mode) shared_store = _make_shared_store(shared_db)
shared_store = Store(shared_db)
user_store = Store(user_db) user_store = Store(user_db)
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1207,7 +1236,7 @@ def search_async(
cached_listings_raw = payload["listings"] cached_listings_raw = payload["listings"]
cached_market_price = payload["market_price"] cached_market_price = payload["market_price"]
try: try:
shared_store = Store(_shared_db) shared_store = _make_shared_store(_shared_db)
user_store = Store(_user_db) user_store = Store(_user_db)
listings = [_Listing(**d) for d in cached_listings_raw] listings = [_Listing(**d) for d in cached_listings_raw]
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1287,11 +1316,11 @@ def search_async(
try: try:
def _run_search(ebay_query: str) -> list: def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters) return _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None: def _run_comps() -> None:
try: try:
_make_adapter(Store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception: except Exception:
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True) log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
@ -1314,7 +1343,7 @@ def search_async(
platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm, platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm,
) )
shared_store = Store(_shared_db) shared_store = _make_shared_store(_shared_db)
user_store = Store(_user_db) user_store = Store(_user_db)
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1473,7 +1502,7 @@ def enrich_seller(
""" """
import threading import threading
shared_store = Store(session.shared_db) shared_store = _make_shared_store(session.shared_db)
user_store = Store(session.user_db) user_store = Store(session.user_db)
shared_db = session.shared_db shared_db = session.shared_db
@ -1502,7 +1531,7 @@ def enrich_seller(
def _btf(): def _btf():
try: try:
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_btf( ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_btf(
{seller: listing_id}, max_workers=1 {seller: listing_id}, max_workers=1
) )
except Exception as e: except Exception as e:
@ -1510,7 +1539,7 @@ def enrich_seller(
def _ssn(): def _ssn():
try: try:
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_categories( ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_categories(
[seller], max_workers=1 [seller], max_workers=1
) )
except Exception as e: except Exception as e:
@ -1781,7 +1810,7 @@ class BlocklistAdd(BaseModel):
@app.get("/api/blocklist") @app.get("/api/blocklist")
def list_blocklist(session: CloudUser = Depends(get_session)): def list_blocklist(session: CloudUser = Depends(get_session)):
store = Store(session.shared_db) store = _make_shared_store(session.shared_db)
return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]} return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]}
@ -1792,7 +1821,7 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
status_code=403, status_code=403,
detail="Sign in to report sellers to the community blocklist.", detail="Sign in to report sellers to the community blocklist.",
) )
store = Store(session.shared_db) store = _make_shared_store(session.shared_db)
entry = store.add_to_blocklist(ScammerEntry( entry = store.add_to_blocklist(ScammerEntry(
platform=body.platform, platform=body.platform,
platform_seller_id=body.platform_seller_id, platform_seller_id=body.platform_seller_id,
@ -1826,13 +1855,13 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
@app.delete("/api/blocklist/{platform_seller_id}", status_code=204) @app.delete("/api/blocklist/{platform_seller_id}", status_code=204)
def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)): def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)):
Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id) _make_shared_store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
@app.get("/api/blocklist/export") @app.get("/api/blocklist/export")
def export_blocklist(session: CloudUser = Depends(get_session)): def export_blocklist(session: CloudUser = Depends(get_session)):
"""Download the blocklist as a CSV file.""" """Download the blocklist as a CSV file."""
entries = Store(session.shared_db).list_blocklist() entries = _make_shared_store(session.shared_db).list_blocklist()
buf = io.StringIO() buf = io.StringIO()
writer = csv.writer(buf) writer = csv.writer(buf)
writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"]) writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"])
@ -1864,7 +1893,7 @@ async def import_blocklist(
except UnicodeDecodeError: except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File must be UTF-8 encoded") raise HTTPException(status_code=400, detail="File must be UTF-8 encoded")
store = Store(session.shared_db) store = _make_shared_store(session.shared_db)
imported = 0 imported = 0
errors: list[str] = [] errors: list[str] = []
reader = csv.DictReader(io.StringIO(text)) reader = csv.DictReader(io.StringIO(text))

View file

@ -0,0 +1,49 @@
-- Snipe shared tables: sellers, market_comps, reported_sellers
-- Replaces the equivalent tables in shared.db (SQLite).
-- Per-user tables (listings, trust_scores, saved_searches) remain in SQLite.
CREATE TABLE IF NOT EXISTS sellers (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
account_age_days INTEGER,
feedback_count INTEGER NOT NULL DEFAULT 0,
feedback_ratio DOUBLE PRECISION NOT NULL DEFAULT 0,
category_history_json TEXT NOT NULL DEFAULT '{}',
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS market_comps (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
query_hash TEXT NOT NULL,
median_price DOUBLE PRECISION NOT NULL,
sample_count INTEGER NOT NULL,
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
UNIQUE (platform, query_hash)
);
CREATE TABLE IF NOT EXISTS reported_sellers (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT,
reported_by TEXT NOT NULL DEFAULT 'user',
reported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS scammer_blocklist (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
reason TEXT,
source TEXT NOT NULL DEFAULT 'manual',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);

View file

380
app/db/pg_shared.py Normal file
View file

@ -0,0 +1,380 @@
from __future__ import annotations
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from app.db.models import MarketComp, ScammerEntry, Seller
log = logging.getLogger(__name__)
_MIN_CONN = 2
_MAX_CONN = 20
class SnipeSharedDB:
"""Thread-safe Postgres connection pool for Snipe shared tables."""
def __init__(self, dsn: str) -> None:
self._pool = ThreadedConnectionPool(_MIN_CONN, _MAX_CONN, dsn=dsn)
def getconn(self):
return self._pool.getconn()
def putconn(self, conn) -> None:
self._pool.putconn(conn)
def close(self) -> None:
self._pool.closeall()
def run_migrations(self) -> None:
"""Apply pg_migrations/*.sql in filename order. Idempotent."""
migrations_dir = Path(__file__).parent / "pg_migrations"
files = sorted(migrations_dir.glob("*.sql"), key=lambda p: p.name)
conn = self.getconn()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS _snipe_shared_migrations (
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
conn.commit()
for f in files:
cur.execute(
"SELECT 1 FROM _snipe_shared_migrations WHERE filename = %s",
(f.name,),
)
if cur.fetchone():
continue
log.info("Applying migration: %s", f.name)
cur.execute(f.read_text())
cur.execute(
"INSERT INTO _snipe_shared_migrations (filename) VALUES (%s)",
(f.name,),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self.putconn(conn)
class SnipeSharedStore:
"""Postgres-backed store for sellers, market_comps, and reported_sellers.
Satisfies SharedTableProtocol. clone() returns self ThreadedConnectionPool
is already thread-safe, so no new instance is needed per thread.
"""
def __init__(self, db: SnipeSharedDB) -> None:
self._db = db
def clone(self) -> "SnipeSharedStore":
return self
# Sellers
def save_seller(self, seller: Seller) -> None:
self.save_sellers([seller])
def save_sellers(self, sellers: list[Seller]) -> None:
if not sellers:
return
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.executemany(
"""
INSERT INTO sellers
(platform, platform_seller_id, username, account_age_days,
feedback_count, feedback_ratio, category_history_json)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
feedback_count = EXCLUDED.feedback_count,
feedback_ratio = EXCLUDED.feedback_ratio,
account_age_days = COALESCE(
EXCLUDED.account_age_days,
sellers.account_age_days
),
category_history_json = COALESCE(
NULLIF(NULLIF(EXCLUDED.category_history_json, '{}'), ''),
NULLIF(NULLIF(sellers.category_history_json, '{}'), ''),
'{}'
),
fetched_at = NOW()
""",
[
(s.platform, s.platform_seller_id, s.username, s.account_age_days,
s.feedback_count, s.feedback_ratio, s.category_history_json or "{}")
for s in sellers
],
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, account_age_days,
feedback_count, feedback_ratio, category_history_json,
id, fetched_at
FROM sellers
WHERE platform = %s AND platform_seller_id = %s
""",
(platform, platform_seller_id),
)
row = cur.fetchone()
if not row:
return None
return Seller(*row[:7], id=row[7], fetched_at=str(row[8]))
finally:
self._db.putconn(conn)
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM sellers WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
# MarketComps
def save_market_comp(self, comp: MarketComp) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO market_comps
(platform, query_hash, median_price, sample_count, expires_at)
VALUES (%s, %s, %s, %s, %s::TIMESTAMPTZ)
ON CONFLICT (platform, query_hash) DO UPDATE SET
median_price = EXCLUDED.median_price,
sample_count = EXCLUDED.sample_count,
expires_at = EXCLUDED.expires_at,
fetched_at = NOW()
""",
(comp.platform, comp.query_hash, comp.median_price,
comp.sample_count, comp.expires_at),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
now = datetime.now(timezone.utc).isoformat()
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, query_hash, median_price, sample_count,
expires_at, id, fetched_at
FROM market_comps
WHERE platform = %s AND query_hash = %s AND expires_at > %s::TIMESTAMPTZ
""",
(platform, query_hash, now),
)
row = cur.fetchone()
if not row:
return None
return MarketComp(*row[:5], id=row[5], fetched_at=str(row[6]))
finally:
self._db.putconn(conn)
# Reported Sellers
def mark_reported(
self,
platform: str,
platform_seller_id: str,
username: Optional[str] = None,
reported_by: str = "user",
) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reported_sellers
(platform, platform_seller_id, username, reported_by)
VALUES (%s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO NOTHING
""",
(platform, platform_seller_id, username, reported_by),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_reported(self, platform: str = "ebay") -> list[str]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT platform_seller_id FROM reported_sellers WHERE platform = %s",
(platform,),
)
return [row[0] for row in cur.fetchall()]
finally:
self._db.putconn(conn)
# Seller Category Refresh
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store=None, # always a SQLite Store in practice
) -> int:
"""Derive category_history_json from listing data and update sellers in Postgres.
listing_store must be provided (it's always the per-user SQLite Store).
Returns count of sellers updated.
"""
from app.platforms.ebay.scraper import _classify_category_label # lazy to avoid circular
import json
if not seller_ids or listing_store is None:
return 0
updated = 0
for sid in seller_ids:
seller = self.get_seller(platform, sid)
if not seller or seller.category_history_json not in ("{}", "", None):
continue
# listing_store is always a SQLite Store; access _conn directly for the query.
rows = listing_store._conn.execute(
"SELECT category_name, COUNT(*) FROM listings "
"WHERE platform=? AND seller_platform_id=? AND category_name IS NOT NULL "
"GROUP BY category_name",
(platform, sid),
).fetchall()
if not rows:
continue
counts: dict[str, int] = {}
for cat_name, cnt in rows:
key = _classify_category_label(cat_name)
if key:
counts[key] = counts.get(key, 0) + cnt
if counts:
from dataclasses import replace
self.save_sellers([replace(seller, category_history_json=json.dumps(counts))])
updated += 1
return updated
# Scammer Blocklist
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s LIMIT 1",
(platform, platform_seller_id),
)
return cur.fetchone() is not None
finally:
self._db.putconn(conn)
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO scammer_blocklist
(platform, platform_seller_id, username, reason, source)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
reason = COALESCE(EXCLUDED.reason, scammer_blocklist.reason),
source = EXCLUDED.source
""",
(entry.platform, entry.platform_seller_id, entry.username,
entry.reason, entry.source),
)
conn.commit()
cur.execute(
"SELECT id, created_at FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(entry.platform, entry.platform_seller_id),
)
row = cur.fetchone()
from dataclasses import replace
return replace(entry, id=row[0], created_at=str(row[1]))
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, reason, source, id, created_at
FROM scammer_blocklist
WHERE platform = %s
ORDER BY created_at DESC
""",
(platform,),
)
return [
ScammerEntry(
platform=r[0], platform_seller_id=r[1], username=r[2],
reason=r[3], source=r[4], id=r[5], created_at=str(r[6]),
)
for r in cur.fetchall()
]
finally:
self._db.putconn(conn)

86
app/db/protocol.py Normal file
View file

@ -0,0 +1,86 @@
"""Protocol (duck-type interface) for shared table backends (SQLite and Postgres)."""
from __future__ import annotations
from typing import Any, Optional, Protocol, runtime_checkable
from app.db.models import MarketComp, ScammerEntry, Seller
@runtime_checkable
class SharedTableProtocol(Protocol):
"""Protocol that both Store (SQLite) and SnipeSharedStore (Postgres) must satisfy.
This enables code that reads/writes shared tables (sellers, market_comps,
reported_sellers, scammer_blocklist) to remain agnostic to the underlying backend.
"""
def save_seller(self, seller: Seller) -> None:
"""Persist a single seller record."""
...
def save_sellers(self, sellers: list[Seller]) -> None:
"""Persist multiple seller records (batch upsert)."""
...
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
"""Fetch a single seller by platform and platform_seller_id."""
...
def save_market_comp(self, comp: MarketComp) -> None:
"""Persist a market comparison record."""
...
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
"""Fetch a market comparison by platform and query_hash."""
...
def mark_reported(
self,
platform: str,
platform_seller_id: str,
username: Optional[str] = None,
reported_by: str = "user",
) -> None:
"""Record that a seller has been reported to the platform."""
...
def list_reported(self, platform: str = "ebay") -> list[str]:
"""Return all platform_seller_ids that have been reported."""
...
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
"""Permanently erase a seller and all related data (GDPR/eBay compliance)."""
...
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store: Optional[Any] = None,
) -> int:
"""Derive category_history_json for sellers that lack it from stored listings.
listing_store: Store holding listings (may differ from self in split-DB mode).
Returns count of sellers updated.
"""
...
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
"""Return True if a seller is on the community scammer blocklist."""
...
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
"""Upsert a seller into the blocklist. Returns the saved entry with id and created_at."""
...
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
"""Remove a seller from the blocklist."""
...
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
"""Return all blocklisted sellers for a platform, newest first."""
...
def clone(self) -> SharedTableProtocol:
"""Create a new independent instance pointing to the same backend."""
...

View file

@ -21,6 +21,10 @@ class Store:
# WAL mode: allows concurrent readers + one writer without blocking # WAL mode: allows concurrent readers + one writer without blocking
self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA journal_mode=WAL")
def clone(self) -> Store:
"""Create a new independent instance pointing to the same database."""
return Store(self._db_path)
# --- Seller --- # --- Seller ---
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None: def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:

View file

@ -22,7 +22,7 @@ _SHOPPING_API_INTER_REQUEST_DELAY = 0.5 # seconds between successive calls
_SELLER_ENRICH_TTL_HOURS = 24 # skip re-enrichment within this window _SELLER_ENRICH_TTL_HOURS = 24 # skip re-enrichment within this window
from app.db.models import Listing, MarketComp, Seller from app.db.models import Listing, MarketComp, Seller
from app.db.store import Store from app.db.protocol import SharedTableProtocol
from app.platforms import PlatformAdapter, SearchFilters from app.platforms import PlatformAdapter, SearchFilters
from app.platforms.ebay.auth import EbayTokenManager from app.platforms.ebay.auth import EbayTokenManager
from app.platforms.ebay.normaliser import normalise_listing, normalise_seller from app.platforms.ebay.normaliser import normalise_listing, normalise_seller
@ -67,7 +67,7 @@ BROWSE_BASE = {
class EbayAdapter(PlatformAdapter): class EbayAdapter(PlatformAdapter):
def __init__(self, token_manager: EbayTokenManager, shared_store: Store, env: str = "production"): def __init__(self, token_manager: EbayTokenManager, shared_store: SharedTableProtocol, env: str = "production"):
self._tokens = token_manager self._tokens = token_manager
self._store = shared_store self._store = shared_store
self._env = env self._env = env

View file

@ -25,7 +25,7 @@ log = logging.getLogger(__name__)
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from app.db.models import Listing, MarketComp, Seller from app.db.models import Listing, MarketComp, Seller
from app.db.store import Store from app.db.protocol import SharedTableProtocol
from app.platforms import PlatformAdapter, SearchFilters from app.platforms import PlatformAdapter, SearchFilters
EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html" EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html"
@ -286,7 +286,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
category_history) cause TrustScorer to set score_is_partial=True. category_history) cause TrustScorer to set score_is_partial=True.
""" """
def __init__(self, shared_store: Store, delay: float = 1.0): def __init__(self, shared_store: SharedTableProtocol, delay: float = 1.0):
self._store = shared_store self._store = shared_store
self._delay = delay self._delay = delay
@ -374,8 +374,6 @@ class ScrapedEbayAdapter(PlatformAdapter):
Does not raise failures per-seller are silently skipped so the main Does not raise failures per-seller are silently skipped so the main
search response is never blocked. search response is never blocked.
""" """
db_path = self._store._db_path # capture for thread-local Store creation
def _enrich_one(item: tuple[str, str]) -> None: def _enrich_one(item: tuple[str, str]) -> None:
seller_id, listing_id = item seller_id, listing_id = item
try: try:
@ -388,7 +386,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
) )
if age_days is None and fb_count is None: if age_days is None and fb_count is None:
return # nothing new to write return # nothing new to write
thread_store = Store(db_path) thread_store = self._store.clone()
seller = thread_store.get_seller("ebay", seller_id) seller = thread_store.get_seller("ebay", seller_id)
if not seller: if not seller:
log.warning("BTF enrich: seller %s not found in DB", seller_id) log.warning("BTF enrich: seller %s not found in DB", seller_id)

View file

@ -2,7 +2,7 @@ import hashlib
import math import math
from app.db.models import Listing, TrustScore from app.db.models import Listing, TrustScore
from app.db.store import Store from app.db.protocol import SharedTableProtocol
from .aggregator import Aggregator from .aggregator import Aggregator
from .metadata import MetadataScorer from .metadata import MetadataScorer
@ -12,7 +12,7 @@ from .photo import PhotoScorer
class TrustScorer: class TrustScorer:
"""Orchestrates metadata + photo scoring for a batch of listings.""" """Orchestrates metadata + photo scoring for a batch of listings."""
def __init__(self, shared_store: Store): def __init__(self, shared_store: SharedTableProtocol):
self._store = shared_store self._store = shared_store
self._meta = MetadataScorer() self._meta = MetadataScorer()
self._photo = PhotoScorer() self._photo = PhotoScorer()

View file

@ -23,6 +23,9 @@ services:
# GPU_SERVER_URL routes LLM query builder through cf-orch for VRAM-aware scheduling. # GPU_SERVER_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
# Override in .env to use a different coordinator URL. # Override in .env to use a different coordinator URL.
GPU_SERVER_URL: "http://host.docker.internal:7700" GPU_SERVER_URL: "http://host.docker.internal:7700"
# SNIPE_SHARED_DB_URL — Postgres DSN for shared tables (sellers, market_comps, blocklist).
# Required for production multi-user deployments. Set in .env (never commit).
# SNIPE_SHARED_DB_URL: "postgresql://snipe:<password>@postgres:5432/snipe_shared"
CF_APP_NAME: snipe CF_APP_NAME: snipe
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"

View file

@ -8,7 +8,7 @@ version = "0.3.0"
description = "Auction listing monitor and trust scorer" description = "Auction listing monitor and trust scorer"
requires-python = ">=3.11" requires-python = ">=3.11"
dependencies = [ dependencies = [
"circuitforge-core>=0.8.0", "circuitforge-core[community]>=0.8.0",
"streamlit>=1.32", "streamlit>=1.32",
"requests>=2.31", "requests>=2.31",
"imagehash>=4.3", "imagehash>=4.3",

17
tests/conftest.py Normal file
View file

@ -0,0 +1,17 @@
import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers",
"postgres: mark test as requiring a live Postgres instance (SNIPE_SHARED_DB_URL must be set)",
)
@pytest.fixture
def postgres_dsn():
dsn = os.environ.get("SNIPE_SHARED_DB_URL")
if not dsn:
pytest.skip("SNIPE_SHARED_DB_URL not set — skipping Postgres tests")
return dsn

157
tests/db/test_pg_shared.py Normal file
View file

@ -0,0 +1,157 @@
"""Tests for SnipeSharedStore — requires live Postgres via SNIPE_SHARED_DB_URL."""
import pytest
from app.db.models import MarketComp, Seller
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore
from app.db.protocol import SharedTableProtocol
@pytest.mark.postgres
def test_snipe_shared_store_satisfies_protocol(postgres_dsn):
assert issubclass(SnipeSharedStore, SharedTableProtocol)
@pytest.mark.postgres
def test_save_and_get_seller(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
seller = Seller(
platform="ebay",
platform_seller_id="test-seller-001",
username="testseller",
account_age_days=365,
feedback_count=100,
feedback_ratio=0.99,
category_history_json='{"electronics": 5}',
)
store.save_seller(seller)
result = store.get_seller("ebay", "test-seller-001")
assert result is not None
assert result.username == "testseller"
assert result.feedback_count == 100
store.delete_seller_data("ebay", "test-seller-001")
db.close()
@pytest.mark.postgres
def test_save_sellers_coalesce_preserves_age(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
seller_with_age = Seller(
platform="ebay", platform_seller_id="coalesce-test",
username="u", account_age_days=730,
feedback_count=50, feedback_ratio=0.95, category_history_json="{}",
)
store.save_seller(seller_with_age)
seller_without_age = Seller(
platform="ebay", platform_seller_id="coalesce-test",
username="u", account_age_days=None,
feedback_count=60, feedback_ratio=0.96, category_history_json="{}",
)
store.save_sellers([seller_without_age])
result = store.get_seller("ebay", "coalesce-test")
assert result.account_age_days == 730
assert result.feedback_count == 60
store.delete_seller_data("ebay", "coalesce-test")
db.close()
@pytest.mark.postgres
def test_market_comp_cache(postgres_dsn):
from datetime import datetime, timedelta, timezone
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
expires = (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat()
comp = MarketComp(
platform="ebay", query_hash="abc123",
median_price=49.99, sample_count=10, expires_at=expires,
)
store.save_market_comp(comp)
result = store.get_market_comp("ebay", "abc123")
assert result is not None
assert result.median_price == 49.99
db.close()
@pytest.mark.postgres
def test_reported_sellers(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.mark_reported("ebay", "bad-seller-99", username="badguy")
reported = store.list_reported("ebay")
assert "bad-seller-99" in reported
store.mark_reported("ebay", "bad-seller-99") # idempotent
db.close()
@pytest.mark.postgres
def test_clone_returns_self(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
store = SnipeSharedStore(db)
assert store.clone() is store
db.close()
@pytest.mark.postgres
def test_blocklist_add_get_remove(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
assert not store.is_blocklisted("ebay", "bad-999")
entry = store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="bad-999",
username="scammer1", reason="sold fakes", source="manual",
))
assert entry.id is not None
assert store.is_blocklisted("ebay", "bad-999")
entries = store.list_blocklist("ebay")
assert any(e.platform_seller_id == "bad-999" for e in entries)
store.remove_from_blocklist("ebay", "bad-999")
assert not store.is_blocklisted("ebay", "bad-999")
db.close()
@pytest.mark.postgres
def test_blocklist_upsert_is_idempotent(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller", reason="reason1", source="manual",
))
# Second add — should not raise, should update username but preserve reason via COALESCE
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller_updated", reason=None, source="community",
))
entries = [e for e in store.list_blocklist("ebay") if e.platform_seller_id == "dup-test"]
assert len(entries) == 1
assert entries[0].username == "seller_updated"
assert entries[0].reason == "reason1" # COALESCE preserved original reason
store.remove_from_blocklist("ebay", "dup-test")
db.close()

39
tests/db/test_protocol.py Normal file
View file

@ -0,0 +1,39 @@
"""Verify Store satisfies SharedTableProtocol at import time."""
from app.db.protocol import SharedTableProtocol
from app.db.store import Store
def test_store_satisfies_protocol():
assert issubclass(Store, SharedTableProtocol)
def test_store_clone_returns_new_instance(tmp_path):
db = tmp_path / "test.db"
s = Store(db)
clone = s.clone()
assert isinstance(clone, Store)
assert clone is not s
assert clone._db_path == db
def test_ebay_adapter_accepts_protocol():
from app.platforms.ebay.adapter import EbayAdapter
import tempfile
import pathlib
from unittest.mock import MagicMock
with tempfile.TemporaryDirectory() as tmp:
s = Store(pathlib.Path(tmp) / "t.db")
adapter = EbayAdapter(token_manager=MagicMock(), shared_store=s)
assert adapter._store is s
def test_scraped_adapter_no_db_path_ref():
from app.platforms.ebay.scraper import ScrapedEbayAdapter
import tempfile
import pathlib
with tempfile.TemporaryDirectory() as tmp:
s = Store(pathlib.Path(tmp) / "t.db")
adapter = ScrapedEbayAdapter(shared_store=s)
assert not hasattr(adapter, '_db_path_ref')