diff --git a/.env.example b/.env.example index 63140b1..bcac5f1 100644 --- a/.env.example +++ b/.env.example @@ -108,6 +108,13 @@ CF_APP_NAME=snipe # Defaults to CF_ORCH_URL if unset. # CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700 +# ── Shared Postgres (optional — strongly recommended for cloud/multi-user) ──── +# When set, sellers, market_comps, reported_sellers, and scammer_blocklist are +# stored in Postgres instead of SQLite. Required to avoid database-locked errors +# under concurrent load (>10 simultaneous search users). +# Cloud instances: set to the cf-postgres DSN. Self-hosted: leave unset for SQLite. +# SNIPE_SHARED_DB_URL=postgresql://snipe:@localhost:5432/snipe_shared + # ── Community DB (optional) ────────────────────────────────────────────────── # When set, seller trust signals (confirmed scammers added to blocklist) are # published to the shared community PostgreSQL for cross-user signal aggregation. diff --git a/api/main.py b/api/main.py index 1710b8a..e95e85f 100644 --- a/api/main.py +++ b/api/main.py @@ -33,6 +33,7 @@ from api.cloud_session import CloudUser, compute_features, get_session from api.ebay_webhook import router as ebay_webhook_router from app.db.models import SavedSearch as SavedSearchModel from app.db.models import ScammerEntry +from app.db.protocol import SharedTableProtocol from app.db.store import Store from app.platforms import SUPPORTED_PLATFORMS, SearchFilters from app.platforms.ebay.adapter import EbayAdapter @@ -142,6 +143,19 @@ def _get_community_store() -> "SnipeCommunityStore | None": return _community_store +# ── Shared Postgres backend (optional — active when SNIPE_SHARED_DB_URL is set) ─ +# Replaces the SQLite shared.db for sellers, market_comps, reported_sellers, and +# scammer_blocklist. ThreadedConnectionPool is thread-safe; one instance per process. +_pg_shared_store: "SharedTableProtocol | None" = None + + +def _make_shared_store(path: Path) -> SharedTableProtocol: + """Return the active shared backend — Postgres if configured, SQLite otherwise.""" + if _pg_shared_store is not None: + return _pg_shared_store + return Store(path) + + # ── LLM Query Builder singletons (optional — requires LLM backend) ──────────── _category_cache = None _query_translator = None @@ -153,7 +167,7 @@ def _get_query_translator(): @asynccontextmanager async def _lifespan(app: FastAPI): - global _community_store + global _community_store, _pg_shared_store # Pre-warm the Chromium browser pool so the first scrape request does not # pay the full cold-start cost (5-10s Xvfb + browser launch). # Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2). @@ -178,6 +192,21 @@ async def _lifespan(app: FastAPI): get_scheduler(sched_db) log.info("Snipe task scheduler started (db=%s)", sched_db) + # Shared Postgres backend — optional. Replaces SQLite for sellers, market_comps, + # reported_sellers, and scammer_blocklist under concurrent load. + snipe_shared_dsn = os.environ.get("SNIPE_SHARED_DB_URL", "") + if snipe_shared_dsn: + try: + from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore as _SnipeSharedStore + _pg_db = SnipeSharedDB(snipe_shared_dsn) + _pg_db.run_migrations() + _pg_shared_store = _SnipeSharedStore(_pg_db) + log.info("Shared Postgres backend ready (sellers, market_comps, blocklist)") + except Exception: + log.exception( + "SNIPE_SHARED_DB_URL set but Postgres init failed — falling back to SQLite" + ) + # Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset. community_db_url = os.environ.get("COMMUNITY_DB_URL", "") if community_db_url: @@ -446,7 +475,7 @@ def session_info(response: Response, session: CloudUser = Depends(get_session)): def _trigger_scraper_enrichment( listings: list, - shared_store: Store, + shared_store: SharedTableProtocol, shared_db: Path, user_db: Path | None = None, query: str = "", @@ -512,7 +541,7 @@ def _trigger_scraper_enrichment( if not session_id or session_id not in _update_queues: return q = _update_queues[session_id] - thread_shared = Store(shared_db) + thread_shared = shared_store.clone() thread_user = Store(user_db or shared_db) scorer = TrustScorer(thread_shared) comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest()) @@ -538,7 +567,7 @@ def _trigger_scraper_enrichment( def _run(): try: - enricher = ScrapedEbayAdapter(Store(shared_db)) + enricher = ScrapedEbayAdapter(shared_store.clone()) if needs_btf: enricher.enrich_sellers_btf(needs_btf, max_workers=2) log.info("BTF enrichment complete for %d sellers", len(needs_btf)) @@ -812,7 +841,7 @@ def search( _update_queues[session_id] = _queue.SimpleQueue() try: - shared_store = Store(shared_db) + shared_store = _make_shared_store(shared_db) user_store = Store(user_db) # Re-hydrate Listing dataclass instances from the cached dicts so the @@ -897,13 +926,14 @@ def search( _evict_expired_cache() log.info("cache: miss key=%s q=%r", cache_key, q) - # Each thread creates its own Store — sqlite3 check_same_thread=True. + # Each thread creates its own store via clone() — sqlite3 check_same_thread=True; + # SnipeSharedStore.clone() returns self (ThreadedConnectionPool is thread-safe). def _run_search(ebay_query: str) -> list: - return _make_adapter(Store(shared_db), adapter, platform=platform).search(ebay_query, base_filters) + return _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).search(ebay_query, base_filters) def _run_comps() -> None: try: - _make_adapter(Store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) + _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) except Exception: log.warning("comps: unhandled exception for %r", comp_query, exc_info=True) @@ -944,10 +974,9 @@ def search( _update_queues[session_id] = _queue.SimpleQueue() try: - # Main-thread stores — fresh connections, same thread. - # shared_store: sellers, market_comps (all users share this data) - # user_store: listings, saved_searches (per-user in cloud mode, same file in local mode) - shared_store = Store(shared_db) + # Main-thread stores — shared_store may be Postgres (sellers, market_comps); + # user_store is always per-user SQLite (listings, trust_scores, saved_searches). + shared_store = _make_shared_store(shared_db) user_store = Store(user_db) user_store.save_listings(listings) @@ -1207,7 +1236,7 @@ def search_async( cached_listings_raw = payload["listings"] cached_market_price = payload["market_price"] try: - shared_store = Store(_shared_db) + shared_store = _make_shared_store(_shared_db) user_store = Store(_user_db) listings = [_Listing(**d) for d in cached_listings_raw] user_store.save_listings(listings) @@ -1287,11 +1316,11 @@ def search_async( try: def _run_search(ebay_query: str) -> list: - return _make_adapter(Store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters) + return _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters) def _run_comps() -> None: try: - _make_adapter(Store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) + _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) except Exception: log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True) @@ -1314,7 +1343,7 @@ def search_async( platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm, ) - shared_store = Store(_shared_db) + shared_store = _make_shared_store(_shared_db) user_store = Store(_user_db) user_store.save_listings(listings) @@ -1473,7 +1502,7 @@ def enrich_seller( """ import threading - shared_store = Store(session.shared_db) + shared_store = _make_shared_store(session.shared_db) user_store = Store(session.user_db) shared_db = session.shared_db @@ -1502,7 +1531,7 @@ def enrich_seller( def _btf(): try: - ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_btf( + ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_btf( {seller: listing_id}, max_workers=1 ) except Exception as e: @@ -1510,7 +1539,7 @@ def enrich_seller( def _ssn(): try: - ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_categories( + ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_categories( [seller], max_workers=1 ) except Exception as e: @@ -1781,7 +1810,7 @@ class BlocklistAdd(BaseModel): @app.get("/api/blocklist") def list_blocklist(session: CloudUser = Depends(get_session)): - store = Store(session.shared_db) + store = _make_shared_store(session.shared_db) return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]} @@ -1792,7 +1821,7 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio status_code=403, detail="Sign in to report sellers to the community blocklist.", ) - store = Store(session.shared_db) + store = _make_shared_store(session.shared_db) entry = store.add_to_blocklist(ScammerEntry( platform=body.platform, platform_seller_id=body.platform_seller_id, @@ -1826,13 +1855,13 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio @app.delete("/api/blocklist/{platform_seller_id}", status_code=204) def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)): - Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id) + _make_shared_store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id) @app.get("/api/blocklist/export") def export_blocklist(session: CloudUser = Depends(get_session)): """Download the blocklist as a CSV file.""" - entries = Store(session.shared_db).list_blocklist() + entries = _make_shared_store(session.shared_db).list_blocklist() buf = io.StringIO() writer = csv.writer(buf) writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"]) @@ -1864,7 +1893,7 @@ async def import_blocklist( except UnicodeDecodeError: raise HTTPException(status_code=400, detail="File must be UTF-8 encoded") - store = Store(session.shared_db) + store = _make_shared_store(session.shared_db) imported = 0 errors: list[str] = [] reader = csv.DictReader(io.StringIO(text)) diff --git a/app/db/pg_migrations/001_shared_tables.sql b/app/db/pg_migrations/001_shared_tables.sql index a82f135..662ef79 100644 --- a/app/db/pg_migrations/001_shared_tables.sql +++ b/app/db/pg_migrations/001_shared_tables.sql @@ -36,3 +36,14 @@ CREATE TABLE IF NOT EXISTS reported_sellers ( UNIQUE (platform, platform_seller_id) ); +CREATE TABLE IF NOT EXISTS scammer_blocklist ( + id BIGSERIAL PRIMARY KEY, + platform TEXT NOT NULL, + platform_seller_id TEXT NOT NULL, + username TEXT NOT NULL, + reason TEXT, + source TEXT NOT NULL DEFAULT 'manual', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (platform, platform_seller_id) +); + diff --git a/app/db/pg_shared.py b/app/db/pg_shared.py index cccd520..7ab9ac9 100644 --- a/app/db/pg_shared.py +++ b/app/db/pg_shared.py @@ -8,7 +8,7 @@ from typing import Optional import psycopg2 from psycopg2.pool import ThreadedConnectionPool -from app.db.models import MarketComp, Seller +from app.db.models import MarketComp, ScammerEntry, Seller log = logging.getLogger(__name__) @@ -248,3 +248,133 @@ class SnipeSharedStore: return [row[0] for row in cur.fetchall()] finally: self._db.putconn(conn) + + # Seller Category Refresh + + def refresh_seller_categories( + self, + platform: str, + seller_ids: list[str], + listing_store=None, # always a SQLite Store in practice + ) -> int: + """Derive category_history_json from listing data and update sellers in Postgres. + + listing_store must be provided (it's always the per-user SQLite Store). + Returns count of sellers updated. + """ + from app.platforms.ebay.scraper import _classify_category_label # lazy to avoid circular + import json + + if not seller_ids or listing_store is None: + return 0 + + updated = 0 + for sid in seller_ids: + seller = self.get_seller(platform, sid) + if not seller or seller.category_history_json not in ("{}", "", None): + continue + # listing_store is always a SQLite Store; access _conn directly for the query. + rows = listing_store._conn.execute( + "SELECT category_name, COUNT(*) FROM listings " + "WHERE platform=? AND seller_platform_id=? AND category_name IS NOT NULL " + "GROUP BY category_name", + (platform, sid), + ).fetchall() + if not rows: + continue + counts: dict[str, int] = {} + for cat_name, cnt in rows: + key = _classify_category_label(cat_name) + if key: + counts[key] = counts.get(key, 0) + cnt + if counts: + from dataclasses import replace + self.save_sellers([replace(seller, category_history_json=json.dumps(counts))]) + updated += 1 + return updated + + # Scammer Blocklist + + def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool: + conn = self._db.getconn() + try: + with conn.cursor() as cur: + cur.execute( + "SELECT 1 FROM scammer_blocklist " + "WHERE platform = %s AND platform_seller_id = %s LIMIT 1", + (platform, platform_seller_id), + ) + return cur.fetchone() is not None + finally: + self._db.putconn(conn) + + def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry: + conn = self._db.getconn() + try: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO scammer_blocklist + (platform, platform_seller_id, username, reason, source) + VALUES (%s, %s, %s, %s, %s) + ON CONFLICT (platform, platform_seller_id) DO UPDATE SET + username = EXCLUDED.username, + reason = COALESCE(EXCLUDED.reason, scammer_blocklist.reason), + source = EXCLUDED.source + """, + (entry.platform, entry.platform_seller_id, entry.username, + entry.reason, entry.source), + ) + conn.commit() + cur.execute( + "SELECT id, created_at FROM scammer_blocklist " + "WHERE platform = %s AND platform_seller_id = %s", + (entry.platform, entry.platform_seller_id), + ) + row = cur.fetchone() + from dataclasses import replace + return replace(entry, id=row[0], created_at=str(row[1])) + except Exception: + conn.rollback() + raise + finally: + self._db.putconn(conn) + + def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None: + conn = self._db.getconn() + try: + with conn.cursor() as cur: + cur.execute( + "DELETE FROM scammer_blocklist " + "WHERE platform = %s AND platform_seller_id = %s", + (platform, platform_seller_id), + ) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + self._db.putconn(conn) + + def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]: + conn = self._db.getconn() + try: + with conn.cursor() as cur: + cur.execute( + """ + SELECT platform, platform_seller_id, username, reason, source, id, created_at + FROM scammer_blocklist + WHERE platform = %s + ORDER BY created_at DESC + """, + (platform,), + ) + return [ + ScammerEntry( + platform=r[0], platform_seller_id=r[1], username=r[2], + reason=r[3], source=r[4], id=r[5], created_at=str(r[6]), + ) + for r in cur.fetchall() + ] + finally: + self._db.putconn(conn) diff --git a/app/db/protocol.py b/app/db/protocol.py index 93fb268..615f83e 100644 --- a/app/db/protocol.py +++ b/app/db/protocol.py @@ -1,17 +1,17 @@ """Protocol (duck-type interface) for shared table backends (SQLite and Postgres).""" from __future__ import annotations -from typing import Optional, Protocol, runtime_checkable +from typing import Any, Optional, Protocol, runtime_checkable -from app.db.models import MarketComp, Seller +from app.db.models import MarketComp, ScammerEntry, Seller @runtime_checkable class SharedTableProtocol(Protocol): """Protocol that both Store (SQLite) and SnipeSharedStore (Postgres) must satisfy. - This enables code that reads/writes shared tables (sellers, market_comps, reported_sellers) - to remain agnostic to the underlying backend. + This enables code that reads/writes shared tables (sellers, market_comps, + reported_sellers, scammer_blocklist) to remain agnostic to the underlying backend. """ def save_seller(self, seller: Seller) -> None: @@ -52,6 +52,35 @@ class SharedTableProtocol(Protocol): """Permanently erase a seller and all related data (GDPR/eBay compliance).""" ... + def refresh_seller_categories( + self, + platform: str, + seller_ids: list[str], + listing_store: Optional[Any] = None, + ) -> int: + """Derive category_history_json for sellers that lack it from stored listings. + + listing_store: Store holding listings (may differ from self in split-DB mode). + Returns count of sellers updated. + """ + ... + + def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool: + """Return True if a seller is on the community scammer blocklist.""" + ... + + def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry: + """Upsert a seller into the blocklist. Returns the saved entry with id and created_at.""" + ... + + def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None: + """Remove a seller from the blocklist.""" + ... + + def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]: + """Return all blocklisted sellers for a platform, newest first.""" + ... + def clone(self) -> SharedTableProtocol: """Create a new independent instance pointing to the same backend.""" ... diff --git a/app/trust/__init__.py b/app/trust/__init__.py index 8a8667c..71a8b04 100644 --- a/app/trust/__init__.py +++ b/app/trust/__init__.py @@ -2,7 +2,7 @@ import hashlib import math from app.db.models import Listing, TrustScore -from app.db.store import Store +from app.db.protocol import SharedTableProtocol from .aggregator import Aggregator from .metadata import MetadataScorer @@ -12,7 +12,7 @@ from .photo import PhotoScorer class TrustScorer: """Orchestrates metadata + photo scoring for a batch of listings.""" - def __init__(self, shared_store: Store): + def __init__(self, shared_store: SharedTableProtocol): self._store = shared_store self._meta = MetadataScorer() self._photo = PhotoScorer() diff --git a/compose.cloud.yml b/compose.cloud.yml index af83ccf..6178fcf 100644 --- a/compose.cloud.yml +++ b/compose.cloud.yml @@ -20,9 +20,12 @@ services: CLOUD_MODE: "true" CLOUD_DATA_ROOT: /devl/snipe-cloud-data # DIRECTUS_JWT_SECRET, HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN — set in .env (never commit) - # CF_ORCH_URL routes LLM query builder through cf-orch for VRAM-aware scheduling. + # GPU_SERVER_URL routes LLM query builder through cf-orch for VRAM-aware scheduling. # Override in .env to use a different coordinator URL. - CF_ORCH_URL: "http://host.docker.internal:7700" + GPU_SERVER_URL: "http://host.docker.internal:7700" + # SNIPE_SHARED_DB_URL — Postgres DSN for shared tables (sellers, market_comps, blocklist). + # Required for production multi-user deployments. Set in .env (never commit). + # SNIPE_SHARED_DB_URL: "postgresql://snipe:@postgres:5432/snipe_shared" CF_APP_NAME: snipe extra_hosts: - "host.docker.internal:host-gateway" diff --git a/tests/db/test_pg_shared.py b/tests/db/test_pg_shared.py index 3736428..d7555c8 100644 --- a/tests/db/test_pg_shared.py +++ b/tests/db/test_pg_shared.py @@ -106,3 +106,52 @@ def test_clone_returns_self(postgres_dsn): store = SnipeSharedStore(db) assert store.clone() is store db.close() + + +@pytest.mark.postgres +def test_blocklist_add_get_remove(postgres_dsn): + from app.db.models import ScammerEntry + db = SnipeSharedDB(postgres_dsn) + db.run_migrations() + store = SnipeSharedStore(db) + + assert not store.is_blocklisted("ebay", "bad-999") + + entry = store.add_to_blocklist(ScammerEntry( + platform="ebay", platform_seller_id="bad-999", + username="scammer1", reason="sold fakes", source="manual", + )) + assert entry.id is not None + assert store.is_blocklisted("ebay", "bad-999") + + entries = store.list_blocklist("ebay") + assert any(e.platform_seller_id == "bad-999" for e in entries) + + store.remove_from_blocklist("ebay", "bad-999") + assert not store.is_blocklisted("ebay", "bad-999") + db.close() + + +@pytest.mark.postgres +def test_blocklist_upsert_is_idempotent(postgres_dsn): + from app.db.models import ScammerEntry + db = SnipeSharedDB(postgres_dsn) + db.run_migrations() + store = SnipeSharedStore(db) + + store.add_to_blocklist(ScammerEntry( + platform="ebay", platform_seller_id="dup-test", + username="seller", reason="reason1", source="manual", + )) + # Second add — should not raise, should update username but preserve reason via COALESCE + store.add_to_blocklist(ScammerEntry( + platform="ebay", platform_seller_id="dup-test", + username="seller_updated", reason=None, source="community", + )) + entries = [e for e in store.list_blocklist("ebay") if e.platform_seller_id == "dup-test"] + assert len(entries) == 1 + assert entries[0].username == "seller_updated" + assert entries[0].reason == "reason1" # COALESCE preserved original reason + + store.remove_from_blocklist("ebay", "dup-test") + db.close()