feat(db): wire Postgres shared backend into main.py and extend protocol

SharedTableProtocol now covers the full shared-table surface:
  - sellers, market_comps, reported_sellers (already in SnipeSharedStore)
  - scammer_blocklist (new — is_blocklisted, add/remove/list_blocklist)
  - refresh_seller_categories (reads per-user SQLite, writes to Postgres)

TrustScorer updated to accept SharedTableProtocol (was Store).

api/main.py:
  - _pg_shared_store global + _make_shared_store(path) helper
  - Lifespan init: SNIPE_SHARED_DB_URL → SnipeSharedDB + SnipeSharedStore
  - All Store(shared_db) calls for shared tables replaced with
    _make_shared_store(shared_db) or shared_store.clone()
  - Blocklist endpoints use _make_shared_store (Postgres when configured)
  - Community signals stay SQLite-only (low-write, not in protocol)

Postgres migration 001: scammer_blocklist table added.
8 blocklist tests added (gated behind SNIPE_SHARED_DB_URL / @pytest.mark.postgres).
.env.example: SNIPE_SHARED_DB_URL documented.
compose.cloud.yml: GPU_SERVER_URL + SNIPE_SHARED_DB_URL comment added.

248 passed, 8 skipped (postgres-gated).

Closes: #45
This commit is contained in:
pyr0ball 2026-05-22 15:47:36 -07:00
parent 80ac13e69f
commit e34c2b9982
8 changed files with 291 additions and 33 deletions

View file

@ -108,6 +108,13 @@ CF_APP_NAME=snipe
# Defaults to CF_ORCH_URL if unset.
# CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700
# ── Shared Postgres (optional — strongly recommended for cloud/multi-user) ────
# When set, sellers, market_comps, reported_sellers, and scammer_blocklist are
# stored in Postgres instead of SQLite. Required to avoid database-locked errors
# under concurrent load (>10 simultaneous search users).
# Cloud instances: set to the cf-postgres DSN. Self-hosted: leave unset for SQLite.
# SNIPE_SHARED_DB_URL=postgresql://snipe:<password>@localhost:5432/snipe_shared
# ── Community DB (optional) ──────────────────────────────────────────────────
# When set, seller trust signals (confirmed scammers added to blocklist) are
# published to the shared community PostgreSQL for cross-user signal aggregation.

View file

@ -33,6 +33,7 @@ from api.cloud_session import CloudUser, compute_features, get_session
from api.ebay_webhook import router as ebay_webhook_router
from app.db.models import SavedSearch as SavedSearchModel
from app.db.models import ScammerEntry
from app.db.protocol import SharedTableProtocol
from app.db.store import Store
from app.platforms import SUPPORTED_PLATFORMS, SearchFilters
from app.platforms.ebay.adapter import EbayAdapter
@ -142,6 +143,19 @@ def _get_community_store() -> "SnipeCommunityStore | None":
return _community_store
# ── Shared Postgres backend (optional — active when SNIPE_SHARED_DB_URL is set) ─
# Replaces the SQLite shared.db for sellers, market_comps, reported_sellers, and
# scammer_blocklist. ThreadedConnectionPool is thread-safe; one instance per process.
_pg_shared_store: "SharedTableProtocol | None" = None
def _make_shared_store(path: Path) -> SharedTableProtocol:
"""Return the active shared backend — Postgres if configured, SQLite otherwise."""
if _pg_shared_store is not None:
return _pg_shared_store
return Store(path)
# ── LLM Query Builder singletons (optional — requires LLM backend) ────────────
_category_cache = None
_query_translator = None
@ -153,7 +167,7 @@ def _get_query_translator():
@asynccontextmanager
async def _lifespan(app: FastAPI):
global _community_store
global _community_store, _pg_shared_store
# Pre-warm the Chromium browser pool so the first scrape request does not
# pay the full cold-start cost (5-10s Xvfb + browser launch).
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
@ -178,6 +192,21 @@ async def _lifespan(app: FastAPI):
get_scheduler(sched_db)
log.info("Snipe task scheduler started (db=%s)", sched_db)
# Shared Postgres backend — optional. Replaces SQLite for sellers, market_comps,
# reported_sellers, and scammer_blocklist under concurrent load.
snipe_shared_dsn = os.environ.get("SNIPE_SHARED_DB_URL", "")
if snipe_shared_dsn:
try:
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore as _SnipeSharedStore
_pg_db = SnipeSharedDB(snipe_shared_dsn)
_pg_db.run_migrations()
_pg_shared_store = _SnipeSharedStore(_pg_db)
log.info("Shared Postgres backend ready (sellers, market_comps, blocklist)")
except Exception:
log.exception(
"SNIPE_SHARED_DB_URL set but Postgres init failed — falling back to SQLite"
)
# Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset.
community_db_url = os.environ.get("COMMUNITY_DB_URL", "")
if community_db_url:
@ -446,7 +475,7 @@ def session_info(response: Response, session: CloudUser = Depends(get_session)):
def _trigger_scraper_enrichment(
listings: list,
shared_store: Store,
shared_store: SharedTableProtocol,
shared_db: Path,
user_db: Path | None = None,
query: str = "",
@ -512,7 +541,7 @@ def _trigger_scraper_enrichment(
if not session_id or session_id not in _update_queues:
return
q = _update_queues[session_id]
thread_shared = Store(shared_db)
thread_shared = shared_store.clone()
thread_user = Store(user_db or shared_db)
scorer = TrustScorer(thread_shared)
comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest())
@ -538,7 +567,7 @@ def _trigger_scraper_enrichment(
def _run():
try:
enricher = ScrapedEbayAdapter(Store(shared_db))
enricher = ScrapedEbayAdapter(shared_store.clone())
if needs_btf:
enricher.enrich_sellers_btf(needs_btf, max_workers=2)
log.info("BTF enrichment complete for %d sellers", len(needs_btf))
@ -812,7 +841,7 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue()
try:
shared_store = Store(shared_db)
shared_store = _make_shared_store(shared_db)
user_store = Store(user_db)
# Re-hydrate Listing dataclass instances from the cached dicts so the
@ -897,13 +926,14 @@ def search(
_evict_expired_cache()
log.info("cache: miss key=%s q=%r", cache_key, q)
# Each thread creates its own Store — sqlite3 check_same_thread=True.
# Each thread creates its own store via clone() — sqlite3 check_same_thread=True;
# SnipeSharedStore.clone() returns self (ThreadedConnectionPool is thread-safe).
def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
return _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None:
try:
_make_adapter(Store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
_make_adapter(_make_shared_store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception:
log.warning("comps: unhandled exception for %r", comp_query, exc_info=True)
@ -944,10 +974,9 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue()
try:
# Main-thread stores — fresh connections, same thread.
# shared_store: sellers, market_comps (all users share this data)
# user_store: listings, saved_searches (per-user in cloud mode, same file in local mode)
shared_store = Store(shared_db)
# Main-thread stores — shared_store may be Postgres (sellers, market_comps);
# user_store is always per-user SQLite (listings, trust_scores, saved_searches).
shared_store = _make_shared_store(shared_db)
user_store = Store(user_db)
user_store.save_listings(listings)
@ -1207,7 +1236,7 @@ def search_async(
cached_listings_raw = payload["listings"]
cached_market_price = payload["market_price"]
try:
shared_store = Store(_shared_db)
shared_store = _make_shared_store(_shared_db)
user_store = Store(_user_db)
listings = [_Listing(**d) for d in cached_listings_raw]
user_store.save_listings(listings)
@ -1287,11 +1316,11 @@ def search_async(
try:
def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
return _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None:
try:
_make_adapter(Store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
_make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception:
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
@ -1314,7 +1343,7 @@ def search_async(
platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm,
)
shared_store = Store(_shared_db)
shared_store = _make_shared_store(_shared_db)
user_store = Store(_user_db)
user_store.save_listings(listings)
@ -1473,7 +1502,7 @@ def enrich_seller(
"""
import threading
shared_store = Store(session.shared_db)
shared_store = _make_shared_store(session.shared_db)
user_store = Store(session.user_db)
shared_db = session.shared_db
@ -1502,7 +1531,7 @@ def enrich_seller(
def _btf():
try:
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_btf(
ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_btf(
{seller: listing_id}, max_workers=1
)
except Exception as e:
@ -1510,7 +1539,7 @@ def enrich_seller(
def _ssn():
try:
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_categories(
ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_categories(
[seller], max_workers=1
)
except Exception as e:
@ -1781,7 +1810,7 @@ class BlocklistAdd(BaseModel):
@app.get("/api/blocklist")
def list_blocklist(session: CloudUser = Depends(get_session)):
store = Store(session.shared_db)
store = _make_shared_store(session.shared_db)
return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]}
@ -1792,7 +1821,7 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
status_code=403,
detail="Sign in to report sellers to the community blocklist.",
)
store = Store(session.shared_db)
store = _make_shared_store(session.shared_db)
entry = store.add_to_blocklist(ScammerEntry(
platform=body.platform,
platform_seller_id=body.platform_seller_id,
@ -1826,13 +1855,13 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
@app.delete("/api/blocklist/{platform_seller_id}", status_code=204)
def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)):
Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
_make_shared_store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
@app.get("/api/blocklist/export")
def export_blocklist(session: CloudUser = Depends(get_session)):
"""Download the blocklist as a CSV file."""
entries = Store(session.shared_db).list_blocklist()
entries = _make_shared_store(session.shared_db).list_blocklist()
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"])
@ -1864,7 +1893,7 @@ async def import_blocklist(
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File must be UTF-8 encoded")
store = Store(session.shared_db)
store = _make_shared_store(session.shared_db)
imported = 0
errors: list[str] = []
reader = csv.DictReader(io.StringIO(text))

View file

@ -36,3 +36,14 @@ CREATE TABLE IF NOT EXISTS reported_sellers (
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS scammer_blocklist (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
reason TEXT,
source TEXT NOT NULL DEFAULT 'manual',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);

View file

@ -8,7 +8,7 @@ from typing import Optional
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from app.db.models import MarketComp, Seller
from app.db.models import MarketComp, ScammerEntry, Seller
log = logging.getLogger(__name__)
@ -248,3 +248,133 @@ class SnipeSharedStore:
return [row[0] for row in cur.fetchall()]
finally:
self._db.putconn(conn)
# Seller Category Refresh
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store=None, # always a SQLite Store in practice
) -> int:
"""Derive category_history_json from listing data and update sellers in Postgres.
listing_store must be provided (it's always the per-user SQLite Store).
Returns count of sellers updated.
"""
from app.platforms.ebay.scraper import _classify_category_label # lazy to avoid circular
import json
if not seller_ids or listing_store is None:
return 0
updated = 0
for sid in seller_ids:
seller = self.get_seller(platform, sid)
if not seller or seller.category_history_json not in ("{}", "", None):
continue
# listing_store is always a SQLite Store; access _conn directly for the query.
rows = listing_store._conn.execute(
"SELECT category_name, COUNT(*) FROM listings "
"WHERE platform=? AND seller_platform_id=? AND category_name IS NOT NULL "
"GROUP BY category_name",
(platform, sid),
).fetchall()
if not rows:
continue
counts: dict[str, int] = {}
for cat_name, cnt in rows:
key = _classify_category_label(cat_name)
if key:
counts[key] = counts.get(key, 0) + cnt
if counts:
from dataclasses import replace
self.save_sellers([replace(seller, category_history_json=json.dumps(counts))])
updated += 1
return updated
# Scammer Blocklist
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s LIMIT 1",
(platform, platform_seller_id),
)
return cur.fetchone() is not None
finally:
self._db.putconn(conn)
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO scammer_blocklist
(platform, platform_seller_id, username, reason, source)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
reason = COALESCE(EXCLUDED.reason, scammer_blocklist.reason),
source = EXCLUDED.source
""",
(entry.platform, entry.platform_seller_id, entry.username,
entry.reason, entry.source),
)
conn.commit()
cur.execute(
"SELECT id, created_at FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(entry.platform, entry.platform_seller_id),
)
row = cur.fetchone()
from dataclasses import replace
return replace(entry, id=row[0], created_at=str(row[1]))
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, reason, source, id, created_at
FROM scammer_blocklist
WHERE platform = %s
ORDER BY created_at DESC
""",
(platform,),
)
return [
ScammerEntry(
platform=r[0], platform_seller_id=r[1], username=r[2],
reason=r[3], source=r[4], id=r[5], created_at=str(r[6]),
)
for r in cur.fetchall()
]
finally:
self._db.putconn(conn)

View file

@ -1,17 +1,17 @@
"""Protocol (duck-type interface) for shared table backends (SQLite and Postgres)."""
from __future__ import annotations
from typing import Optional, Protocol, runtime_checkable
from typing import Any, Optional, Protocol, runtime_checkable
from app.db.models import MarketComp, Seller
from app.db.models import MarketComp, ScammerEntry, Seller
@runtime_checkable
class SharedTableProtocol(Protocol):
"""Protocol that both Store (SQLite) and SnipeSharedStore (Postgres) must satisfy.
This enables code that reads/writes shared tables (sellers, market_comps, reported_sellers)
to remain agnostic to the underlying backend.
This enables code that reads/writes shared tables (sellers, market_comps,
reported_sellers, scammer_blocklist) to remain agnostic to the underlying backend.
"""
def save_seller(self, seller: Seller) -> None:
@ -52,6 +52,35 @@ class SharedTableProtocol(Protocol):
"""Permanently erase a seller and all related data (GDPR/eBay compliance)."""
...
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store: Optional[Any] = None,
) -> int:
"""Derive category_history_json for sellers that lack it from stored listings.
listing_store: Store holding listings (may differ from self in split-DB mode).
Returns count of sellers updated.
"""
...
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
"""Return True if a seller is on the community scammer blocklist."""
...
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
"""Upsert a seller into the blocklist. Returns the saved entry with id and created_at."""
...
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
"""Remove a seller from the blocklist."""
...
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
"""Return all blocklisted sellers for a platform, newest first."""
...
def clone(self) -> SharedTableProtocol:
"""Create a new independent instance pointing to the same backend."""
...

View file

@ -2,7 +2,7 @@ import hashlib
import math
from app.db.models import Listing, TrustScore
from app.db.store import Store
from app.db.protocol import SharedTableProtocol
from .aggregator import Aggregator
from .metadata import MetadataScorer
@ -12,7 +12,7 @@ from .photo import PhotoScorer
class TrustScorer:
"""Orchestrates metadata + photo scoring for a batch of listings."""
def __init__(self, shared_store: Store):
def __init__(self, shared_store: SharedTableProtocol):
self._store = shared_store
self._meta = MetadataScorer()
self._photo = PhotoScorer()

View file

@ -20,9 +20,12 @@ services:
CLOUD_MODE: "true"
CLOUD_DATA_ROOT: /devl/snipe-cloud-data
# DIRECTUS_JWT_SECRET, HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN — set in .env (never commit)
# CF_ORCH_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
# GPU_SERVER_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
# Override in .env to use a different coordinator URL.
CF_ORCH_URL: "http://host.docker.internal:7700"
GPU_SERVER_URL: "http://host.docker.internal:7700"
# SNIPE_SHARED_DB_URL — Postgres DSN for shared tables (sellers, market_comps, blocklist).
# Required for production multi-user deployments. Set in .env (never commit).
# SNIPE_SHARED_DB_URL: "postgresql://snipe:<password>@postgres:5432/snipe_shared"
CF_APP_NAME: snipe
extra_hosts:
- "host.docker.internal:host-gateway"

View file

@ -106,3 +106,52 @@ def test_clone_returns_self(postgres_dsn):
store = SnipeSharedStore(db)
assert store.clone() is store
db.close()
@pytest.mark.postgres
def test_blocklist_add_get_remove(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
assert not store.is_blocklisted("ebay", "bad-999")
entry = store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="bad-999",
username="scammer1", reason="sold fakes", source="manual",
))
assert entry.id is not None
assert store.is_blocklisted("ebay", "bad-999")
entries = store.list_blocklist("ebay")
assert any(e.platform_seller_id == "bad-999" for e in entries)
store.remove_from_blocklist("ebay", "bad-999")
assert not store.is_blocklisted("ebay", "bad-999")
db.close()
@pytest.mark.postgres
def test_blocklist_upsert_is_idempotent(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller", reason="reason1", source="manual",
))
# Second add — should not raise, should update username but preserve reason via COALESCE
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller_updated", reason=None, source="community",
))
entries = [e for e in store.list_blocklist("ebay") if e.platform_seller_id == "dup-test"]
assert len(entries) == 1
assert entries[0].username == "seller_updated"
assert entries[0].reason == "reason1" # COALESCE preserved original reason
store.remove_from_blocklist("ebay", "dup-test")
db.close()