Compare commits
11 commits
4dd44fdafb
...
9bd6d9513e
| Author | SHA1 | Date | |
|---|---|---|---|
| 9bd6d9513e | |||
| 341d66d5f0 | |||
| e34c2b9982 | |||
| cc997c09e3 | |||
| c10a481ce3 | |||
| 80ac13e69f | |||
| 9d8b627fe1 | |||
| 1d6556072f | |||
| 78809c761e | |||
| 6fbcf90740 | |||
| 5ddfbece8e |
22 changed files with 832 additions and 53 deletions
15
.env.example
15
.env.example
|
|
@ -98,16 +98,25 @@ CF_APP_NAME=snipe
|
|||
# OLLAMA_HOST=http://localhost:11434
|
||||
# OLLAMA_MODEL=llava:7b
|
||||
|
||||
# CF Orchestrator — routes vision/LLM tasks to a cf-orch coordinator for VRAM management.
|
||||
# GPU Server — routes vision/LLM tasks to a cf-orch coordinator for VRAM management.
|
||||
# Self-hosted: point at a local cf-orch coordinator if you have one running.
|
||||
# Cloud (internal): managed coordinator at orch.circuitforge.tech.
|
||||
# Leave unset to run vision tasks inline (no VRAM coordination).
|
||||
# CF_ORCH_URL=http://10.1.10.71:7700
|
||||
# GPU_SERVER_URL=http://10.1.10.71:7700
|
||||
#
|
||||
# CF_ORCH_URL is accepted as a backward-compat alias for GPU_SERVER_URL.
|
||||
#
|
||||
# cf-orch agent (compose --profile orch) — coordinator URL for the sidecar agent.
|
||||
# Defaults to CF_ORCH_URL if unset.
|
||||
# Defaults to GPU_SERVER_URL if unset.
|
||||
# CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700
|
||||
|
||||
# ── Shared Postgres (optional — strongly recommended for cloud/multi-user) ────
|
||||
# When set, sellers, market_comps, reported_sellers, and scammer_blocklist are
|
||||
# stored in Postgres instead of SQLite. Required to avoid database-locked errors
|
||||
# under concurrent load (>10 simultaneous search users).
|
||||
# Cloud instances: set to the cf-postgres DSN. Self-hosted: leave unset for SQLite.
|
||||
# SNIPE_SHARED_DB_URL=postgresql://snipe:<password>@localhost:5432/snipe_shared
|
||||
|
||||
# ── Community DB (optional) ──────────────────────────────────────────────────
|
||||
# When set, seller trust signals (confirmed scammers added to blocklist) are
|
||||
# published to the shared community PostgreSQL for cross-user signal aggregation.
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ WORKDIR /app
|
|||
# System deps for Playwright/Chromium
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
xvfb \
|
||||
libpq-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install circuitforge-core from sibling directory (compose sets context: ..)
|
||||
|
|
|
|||
|
|
@ -174,6 +174,8 @@ Snipe uses a dual license:
|
|||
| Discovery pipeline — scrapers, platform adapters, search, keyword filtering | [MIT](LICENSE-MIT) |
|
||||
| LLM trust-scoring, query builder, vision assessment, AI features | [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use requires a paid license; converts to MIT after 4 years |
|
||||
|
||||
Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)
|
||||
|
||||
Privacy · Safety · Accessibility — co-equal, non-negotiable.
|
||||
|
||||
[circuitforge.tech](https://circuitforge.tech)
|
||||
|
|
|
|||
79
api/main.py
79
api/main.py
|
|
@ -33,6 +33,7 @@ from api.cloud_session import CloudUser, compute_features, get_session
|
|||
from api.ebay_webhook import router as ebay_webhook_router
|
||||
from app.db.models import SavedSearch as SavedSearchModel
|
||||
from app.db.models import ScammerEntry
|
||||
from app.db.protocol import SharedTableProtocol
|
||||
from app.db.store import Store
|
||||
from app.platforms import SUPPORTED_PLATFORMS, SearchFilters
|
||||
from app.platforms.ebay.adapter import EbayAdapter
|
||||
|
|
@ -142,6 +143,19 @@ def _get_community_store() -> "SnipeCommunityStore | None":
|
|||
return _community_store
|
||||
|
||||
|
||||
# ── Shared Postgres backend (optional — active when SNIPE_SHARED_DB_URL is set) ─
|
||||
# Replaces the SQLite shared.db for sellers, market_comps, reported_sellers, and
|
||||
# scammer_blocklist. ThreadedConnectionPool is thread-safe; one instance per process.
|
||||
_pg_shared_store: "SharedTableProtocol | None" = None
|
||||
|
||||
|
||||
def _make_shared_store(path: Path) -> SharedTableProtocol:
|
||||
"""Return the active shared backend — Postgres if configured, SQLite otherwise."""
|
||||
if _pg_shared_store is not None:
|
||||
return _pg_shared_store
|
||||
return Store(path)
|
||||
|
||||
|
||||
# ── LLM Query Builder singletons (optional — requires LLM backend) ────────────
|
||||
_category_cache = None
|
||||
_query_translator = None
|
||||
|
|
@ -153,7 +167,7 @@ def _get_query_translator():
|
|||
|
||||
@asynccontextmanager
|
||||
async def _lifespan(app: FastAPI):
|
||||
global _community_store
|
||||
global _community_store, _pg_shared_store
|
||||
# Pre-warm the Chromium browser pool so the first scrape request does not
|
||||
# pay the full cold-start cost (5-10s Xvfb + browser launch).
|
||||
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
|
||||
|
|
@ -178,6 +192,21 @@ async def _lifespan(app: FastAPI):
|
|||
get_scheduler(sched_db)
|
||||
log.info("Snipe task scheduler started (db=%s)", sched_db)
|
||||
|
||||
# Shared Postgres backend — optional. Replaces SQLite for sellers, market_comps,
|
||||
# reported_sellers, and scammer_blocklist under concurrent load.
|
||||
snipe_shared_dsn = os.environ.get("SNIPE_SHARED_DB_URL", "")
|
||||
if snipe_shared_dsn:
|
||||
try:
|
||||
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore as _SnipeSharedStore
|
||||
_pg_db = SnipeSharedDB(snipe_shared_dsn)
|
||||
_pg_db.run_migrations()
|
||||
_pg_shared_store = _SnipeSharedStore(_pg_db)
|
||||
log.info("Shared Postgres backend ready (sellers, market_comps, blocklist)")
|
||||
except Exception:
|
||||
log.exception(
|
||||
"SNIPE_SHARED_DB_URL set but Postgres init failed — falling back to SQLite"
|
||||
)
|
||||
|
||||
# Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset.
|
||||
community_db_url = os.environ.get("COMMUNITY_DB_URL", "")
|
||||
if community_db_url:
|
||||
|
|
@ -209,7 +238,7 @@ async def _lifespan(app: FastAPI):
|
|||
_category_cache.refresh(token_manager=None) # bootstrap fallback
|
||||
|
||||
try:
|
||||
cforch_url = os.getenv("CF_ORCH_URL") or None
|
||||
cforch_url = os.getenv("GPU_SERVER_URL") or os.getenv("CF_ORCH_URL") or None
|
||||
if cforch_url:
|
||||
_query_translator = QueryTranslator(
|
||||
category_cache=_category_cache,
|
||||
|
|
@ -446,7 +475,7 @@ def session_info(response: Response, session: CloudUser = Depends(get_session)):
|
|||
|
||||
def _trigger_scraper_enrichment(
|
||||
listings: list,
|
||||
shared_store: Store,
|
||||
shared_store: SharedTableProtocol,
|
||||
shared_db: Path,
|
||||
user_db: Path | None = None,
|
||||
query: str = "",
|
||||
|
|
@ -512,7 +541,7 @@ def _trigger_scraper_enrichment(
|
|||
if not session_id or session_id not in _update_queues:
|
||||
return
|
||||
q = _update_queues[session_id]
|
||||
thread_shared = Store(shared_db)
|
||||
thread_shared = shared_store.clone()
|
||||
thread_user = Store(user_db or shared_db)
|
||||
scorer = TrustScorer(thread_shared)
|
||||
comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest())
|
||||
|
|
@ -538,7 +567,7 @@ def _trigger_scraper_enrichment(
|
|||
|
||||
def _run():
|
||||
try:
|
||||
enricher = ScrapedEbayAdapter(Store(shared_db))
|
||||
enricher = ScrapedEbayAdapter(shared_store.clone())
|
||||
if needs_btf:
|
||||
enricher.enrich_sellers_btf(needs_btf, max_workers=2)
|
||||
log.info("BTF enrichment complete for %d sellers", len(needs_btf))
|
||||
|
|
@ -812,7 +841,7 @@ def search(
|
|||
_update_queues[session_id] = _queue.SimpleQueue()
|
||||
|
||||
try:
|
||||
shared_store = Store(shared_db)
|
||||
shared_store = _make_shared_store(shared_db)
|
||||
user_store = Store(user_db)
|
||||
|
||||
# Re-hydrate Listing dataclass instances from the cached dicts so the
|
||||
|
|
@ -897,13 +926,14 @@ def search(
|
|||
_evict_expired_cache()
|
||||
log.info("cache: miss key=%s q=%r", cache_key, q)
|
||||
|
||||
# Each thread creates its own Store — sqlite3 check_same_thread=True.
|
||||
# Each thread creates its own store via clone() — sqlite3 check_same_thread=True;
|
||||
# SnipeSharedStore.clone() returns self (ThreadedConnectionPool is thread-safe).
|
||||
def _run_search(ebay_query: str) -> list:
|
||||
return _make_adapter(Store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
|
||||
return _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
|
||||
|
||||
def _run_comps() -> None:
|
||||
try:
|
||||
_make_adapter(Store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
|
||||
_make_adapter(_make_shared_store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
|
||||
except Exception:
|
||||
log.warning("comps: unhandled exception for %r", comp_query, exc_info=True)
|
||||
|
||||
|
|
@ -944,10 +974,9 @@ def search(
|
|||
_update_queues[session_id] = _queue.SimpleQueue()
|
||||
|
||||
try:
|
||||
# Main-thread stores — fresh connections, same thread.
|
||||
# shared_store: sellers, market_comps (all users share this data)
|
||||
# user_store: listings, saved_searches (per-user in cloud mode, same file in local mode)
|
||||
shared_store = Store(shared_db)
|
||||
# Main-thread stores — shared_store may be Postgres (sellers, market_comps);
|
||||
# user_store is always per-user SQLite (listings, trust_scores, saved_searches).
|
||||
shared_store = _make_shared_store(shared_db)
|
||||
user_store = Store(user_db)
|
||||
|
||||
user_store.save_listings(listings)
|
||||
|
|
@ -1207,7 +1236,7 @@ def search_async(
|
|||
cached_listings_raw = payload["listings"]
|
||||
cached_market_price = payload["market_price"]
|
||||
try:
|
||||
shared_store = Store(_shared_db)
|
||||
shared_store = _make_shared_store(_shared_db)
|
||||
user_store = Store(_user_db)
|
||||
listings = [_Listing(**d) for d in cached_listings_raw]
|
||||
user_store.save_listings(listings)
|
||||
|
|
@ -1287,11 +1316,11 @@ def search_async(
|
|||
|
||||
try:
|
||||
def _run_search(ebay_query: str) -> list:
|
||||
return _make_adapter(Store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
|
||||
return _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
|
||||
|
||||
def _run_comps() -> None:
|
||||
try:
|
||||
_make_adapter(Store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
|
||||
_make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
|
||||
except Exception:
|
||||
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
|
||||
|
||||
|
|
@ -1314,7 +1343,7 @@ def search_async(
|
|||
platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm,
|
||||
)
|
||||
|
||||
shared_store = Store(_shared_db)
|
||||
shared_store = _make_shared_store(_shared_db)
|
||||
user_store = Store(_user_db)
|
||||
|
||||
user_store.save_listings(listings)
|
||||
|
|
@ -1473,7 +1502,7 @@ def enrich_seller(
|
|||
"""
|
||||
import threading
|
||||
|
||||
shared_store = Store(session.shared_db)
|
||||
shared_store = _make_shared_store(session.shared_db)
|
||||
user_store = Store(session.user_db)
|
||||
shared_db = session.shared_db
|
||||
|
||||
|
|
@ -1502,7 +1531,7 @@ def enrich_seller(
|
|||
|
||||
def _btf():
|
||||
try:
|
||||
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_btf(
|
||||
ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_btf(
|
||||
{seller: listing_id}, max_workers=1
|
||||
)
|
||||
except Exception as e:
|
||||
|
|
@ -1510,7 +1539,7 @@ def enrich_seller(
|
|||
|
||||
def _ssn():
|
||||
try:
|
||||
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_categories(
|
||||
ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_categories(
|
||||
[seller], max_workers=1
|
||||
)
|
||||
except Exception as e:
|
||||
|
|
@ -1781,7 +1810,7 @@ class BlocklistAdd(BaseModel):
|
|||
|
||||
@app.get("/api/blocklist")
|
||||
def list_blocklist(session: CloudUser = Depends(get_session)):
|
||||
store = Store(session.shared_db)
|
||||
store = _make_shared_store(session.shared_db)
|
||||
return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]}
|
||||
|
||||
|
||||
|
|
@ -1792,7 +1821,7 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
|
|||
status_code=403,
|
||||
detail="Sign in to report sellers to the community blocklist.",
|
||||
)
|
||||
store = Store(session.shared_db)
|
||||
store = _make_shared_store(session.shared_db)
|
||||
entry = store.add_to_blocklist(ScammerEntry(
|
||||
platform=body.platform,
|
||||
platform_seller_id=body.platform_seller_id,
|
||||
|
|
@ -1826,13 +1855,13 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
|
|||
|
||||
@app.delete("/api/blocklist/{platform_seller_id}", status_code=204)
|
||||
def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)):
|
||||
Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
|
||||
_make_shared_store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
|
||||
|
||||
|
||||
@app.get("/api/blocklist/export")
|
||||
def export_blocklist(session: CloudUser = Depends(get_session)):
|
||||
"""Download the blocklist as a CSV file."""
|
||||
entries = Store(session.shared_db).list_blocklist()
|
||||
entries = _make_shared_store(session.shared_db).list_blocklist()
|
||||
buf = io.StringIO()
|
||||
writer = csv.writer(buf)
|
||||
writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"])
|
||||
|
|
@ -1864,7 +1893,7 @@ async def import_blocklist(
|
|||
except UnicodeDecodeError:
|
||||
raise HTTPException(status_code=400, detail="File must be UTF-8 encoded")
|
||||
|
||||
store = Store(session.shared_db)
|
||||
store = _make_shared_store(session.shared_db)
|
||||
imported = 0
|
||||
errors: list[str] = []
|
||||
reader = csv.DictReader(io.StringIO(text))
|
||||
|
|
|
|||
49
app/db/pg_migrations/001_shared_tables.sql
Normal file
49
app/db/pg_migrations/001_shared_tables.sql
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
-- Snipe shared tables: sellers, market_comps, reported_sellers
|
||||
-- Replaces the equivalent tables in shared.db (SQLite).
|
||||
-- Per-user tables (listings, trust_scores, saved_searches) remain in SQLite.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS sellers (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
platform TEXT NOT NULL,
|
||||
platform_seller_id TEXT NOT NULL,
|
||||
username TEXT NOT NULL,
|
||||
account_age_days INTEGER,
|
||||
feedback_count INTEGER NOT NULL DEFAULT 0,
|
||||
feedback_ratio DOUBLE PRECISION NOT NULL DEFAULT 0,
|
||||
category_history_json TEXT NOT NULL DEFAULT '{}',
|
||||
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE (platform, platform_seller_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS market_comps (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
platform TEXT NOT NULL,
|
||||
query_hash TEXT NOT NULL,
|
||||
median_price DOUBLE PRECISION NOT NULL,
|
||||
sample_count INTEGER NOT NULL,
|
||||
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
expires_at TIMESTAMPTZ NOT NULL,
|
||||
UNIQUE (platform, query_hash)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS reported_sellers (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
platform TEXT NOT NULL,
|
||||
platform_seller_id TEXT NOT NULL,
|
||||
username TEXT,
|
||||
reported_by TEXT NOT NULL DEFAULT 'user',
|
||||
reported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE (platform, platform_seller_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS scammer_blocklist (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
platform TEXT NOT NULL,
|
||||
platform_seller_id TEXT NOT NULL,
|
||||
username TEXT NOT NULL,
|
||||
reason TEXT,
|
||||
source TEXT NOT NULL DEFAULT 'manual',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE (platform, platform_seller_id)
|
||||
);
|
||||
|
||||
0
app/db/pg_migrations/__init__.py
Normal file
0
app/db/pg_migrations/__init__.py
Normal file
380
app/db/pg_shared.py
Normal file
380
app/db/pg_shared.py
Normal file
|
|
@ -0,0 +1,380 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.pool import ThreadedConnectionPool
|
||||
|
||||
from app.db.models import MarketComp, ScammerEntry, Seller
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_MIN_CONN = 2
|
||||
_MAX_CONN = 20
|
||||
|
||||
|
||||
class SnipeSharedDB:
|
||||
"""Thread-safe Postgres connection pool for Snipe shared tables."""
|
||||
|
||||
def __init__(self, dsn: str) -> None:
|
||||
self._pool = ThreadedConnectionPool(_MIN_CONN, _MAX_CONN, dsn=dsn)
|
||||
|
||||
def getconn(self):
|
||||
return self._pool.getconn()
|
||||
|
||||
def putconn(self, conn) -> None:
|
||||
self._pool.putconn(conn)
|
||||
|
||||
def close(self) -> None:
|
||||
self._pool.closeall()
|
||||
|
||||
def run_migrations(self) -> None:
|
||||
"""Apply pg_migrations/*.sql in filename order. Idempotent."""
|
||||
migrations_dir = Path(__file__).parent / "pg_migrations"
|
||||
files = sorted(migrations_dir.glob("*.sql"), key=lambda p: p.name)
|
||||
|
||||
conn = self.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS _snipe_shared_migrations (
|
||||
filename TEXT PRIMARY KEY,
|
||||
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
for f in files:
|
||||
cur.execute(
|
||||
"SELECT 1 FROM _snipe_shared_migrations WHERE filename = %s",
|
||||
(f.name,),
|
||||
)
|
||||
if cur.fetchone():
|
||||
continue
|
||||
log.info("Applying migration: %s", f.name)
|
||||
cur.execute(f.read_text())
|
||||
cur.execute(
|
||||
"INSERT INTO _snipe_shared_migrations (filename) VALUES (%s)",
|
||||
(f.name,),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
self.putconn(conn)
|
||||
|
||||
|
||||
class SnipeSharedStore:
|
||||
"""Postgres-backed store for sellers, market_comps, and reported_sellers.
|
||||
|
||||
Satisfies SharedTableProtocol. clone() returns self — ThreadedConnectionPool
|
||||
is already thread-safe, so no new instance is needed per thread.
|
||||
"""
|
||||
|
||||
def __init__(self, db: SnipeSharedDB) -> None:
|
||||
self._db = db
|
||||
|
||||
def clone(self) -> "SnipeSharedStore":
|
||||
return self
|
||||
|
||||
# Sellers
|
||||
|
||||
def save_seller(self, seller: Seller) -> None:
|
||||
self.save_sellers([seller])
|
||||
|
||||
def save_sellers(self, sellers: list[Seller]) -> None:
|
||||
if not sellers:
|
||||
return
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.executemany(
|
||||
"""
|
||||
INSERT INTO sellers
|
||||
(platform, platform_seller_id, username, account_age_days,
|
||||
feedback_count, feedback_ratio, category_history_json)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
|
||||
username = EXCLUDED.username,
|
||||
feedback_count = EXCLUDED.feedback_count,
|
||||
feedback_ratio = EXCLUDED.feedback_ratio,
|
||||
account_age_days = COALESCE(
|
||||
EXCLUDED.account_age_days,
|
||||
sellers.account_age_days
|
||||
),
|
||||
category_history_json = COALESCE(
|
||||
NULLIF(NULLIF(EXCLUDED.category_history_json, '{}'), ''),
|
||||
NULLIF(NULLIF(sellers.category_history_json, '{}'), ''),
|
||||
'{}'
|
||||
),
|
||||
fetched_at = NOW()
|
||||
""",
|
||||
[
|
||||
(s.platform, s.platform_seller_id, s.username, s.account_age_days,
|
||||
s.feedback_count, s.feedback_ratio, s.category_history_json or "{}")
|
||||
for s in sellers
|
||||
],
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT platform, platform_seller_id, username, account_age_days,
|
||||
feedback_count, feedback_ratio, category_history_json,
|
||||
id, fetched_at
|
||||
FROM sellers
|
||||
WHERE platform = %s AND platform_seller_id = %s
|
||||
""",
|
||||
(platform, platform_seller_id),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return Seller(*row[:7], id=row[7], fetched_at=str(row[8]))
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"DELETE FROM sellers WHERE platform = %s AND platform_seller_id = %s",
|
||||
(platform, platform_seller_id),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
# MarketComps
|
||||
|
||||
def save_market_comp(self, comp: MarketComp) -> None:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO market_comps
|
||||
(platform, query_hash, median_price, sample_count, expires_at)
|
||||
VALUES (%s, %s, %s, %s, %s::TIMESTAMPTZ)
|
||||
ON CONFLICT (platform, query_hash) DO UPDATE SET
|
||||
median_price = EXCLUDED.median_price,
|
||||
sample_count = EXCLUDED.sample_count,
|
||||
expires_at = EXCLUDED.expires_at,
|
||||
fetched_at = NOW()
|
||||
""",
|
||||
(comp.platform, comp.query_hash, comp.median_price,
|
||||
comp.sample_count, comp.expires_at),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT platform, query_hash, median_price, sample_count,
|
||||
expires_at, id, fetched_at
|
||||
FROM market_comps
|
||||
WHERE platform = %s AND query_hash = %s AND expires_at > %s::TIMESTAMPTZ
|
||||
""",
|
||||
(platform, query_hash, now),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return MarketComp(*row[:5], id=row[5], fetched_at=str(row[6]))
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
# Reported Sellers
|
||||
|
||||
def mark_reported(
|
||||
self,
|
||||
platform: str,
|
||||
platform_seller_id: str,
|
||||
username: Optional[str] = None,
|
||||
reported_by: str = "user",
|
||||
) -> None:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO reported_sellers
|
||||
(platform, platform_seller_id, username, reported_by)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
ON CONFLICT (platform, platform_seller_id) DO NOTHING
|
||||
""",
|
||||
(platform, platform_seller_id, username, reported_by),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
def list_reported(self, platform: str = "ebay") -> list[str]:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT platform_seller_id FROM reported_sellers WHERE platform = %s",
|
||||
(platform,),
|
||||
)
|
||||
return [row[0] for row in cur.fetchall()]
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
# Seller Category Refresh
|
||||
|
||||
def refresh_seller_categories(
|
||||
self,
|
||||
platform: str,
|
||||
seller_ids: list[str],
|
||||
listing_store=None, # always a SQLite Store in practice
|
||||
) -> int:
|
||||
"""Derive category_history_json from listing data and update sellers in Postgres.
|
||||
|
||||
listing_store must be provided (it's always the per-user SQLite Store).
|
||||
Returns count of sellers updated.
|
||||
"""
|
||||
from app.platforms.ebay.scraper import _classify_category_label # lazy to avoid circular
|
||||
import json
|
||||
|
||||
if not seller_ids or listing_store is None:
|
||||
return 0
|
||||
|
||||
updated = 0
|
||||
for sid in seller_ids:
|
||||
seller = self.get_seller(platform, sid)
|
||||
if not seller or seller.category_history_json not in ("{}", "", None):
|
||||
continue
|
||||
# listing_store is always a SQLite Store; access _conn directly for the query.
|
||||
rows = listing_store._conn.execute(
|
||||
"SELECT category_name, COUNT(*) FROM listings "
|
||||
"WHERE platform=? AND seller_platform_id=? AND category_name IS NOT NULL "
|
||||
"GROUP BY category_name",
|
||||
(platform, sid),
|
||||
).fetchall()
|
||||
if not rows:
|
||||
continue
|
||||
counts: dict[str, int] = {}
|
||||
for cat_name, cnt in rows:
|
||||
key = _classify_category_label(cat_name)
|
||||
if key:
|
||||
counts[key] = counts.get(key, 0) + cnt
|
||||
if counts:
|
||||
from dataclasses import replace
|
||||
self.save_sellers([replace(seller, category_history_json=json.dumps(counts))])
|
||||
updated += 1
|
||||
return updated
|
||||
|
||||
# Scammer Blocklist
|
||||
|
||||
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT 1 FROM scammer_blocklist "
|
||||
"WHERE platform = %s AND platform_seller_id = %s LIMIT 1",
|
||||
(platform, platform_seller_id),
|
||||
)
|
||||
return cur.fetchone() is not None
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO scammer_blocklist
|
||||
(platform, platform_seller_id, username, reason, source)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
|
||||
username = EXCLUDED.username,
|
||||
reason = COALESCE(EXCLUDED.reason, scammer_blocklist.reason),
|
||||
source = EXCLUDED.source
|
||||
""",
|
||||
(entry.platform, entry.platform_seller_id, entry.username,
|
||||
entry.reason, entry.source),
|
||||
)
|
||||
conn.commit()
|
||||
cur.execute(
|
||||
"SELECT id, created_at FROM scammer_blocklist "
|
||||
"WHERE platform = %s AND platform_seller_id = %s",
|
||||
(entry.platform, entry.platform_seller_id),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
from dataclasses import replace
|
||||
return replace(entry, id=row[0], created_at=str(row[1]))
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"DELETE FROM scammer_blocklist "
|
||||
"WHERE platform = %s AND platform_seller_id = %s",
|
||||
(platform, platform_seller_id),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
|
||||
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
|
||||
conn = self._db.getconn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT platform, platform_seller_id, username, reason, source, id, created_at
|
||||
FROM scammer_blocklist
|
||||
WHERE platform = %s
|
||||
ORDER BY created_at DESC
|
||||
""",
|
||||
(platform,),
|
||||
)
|
||||
return [
|
||||
ScammerEntry(
|
||||
platform=r[0], platform_seller_id=r[1], username=r[2],
|
||||
reason=r[3], source=r[4], id=r[5], created_at=str(r[6]),
|
||||
)
|
||||
for r in cur.fetchall()
|
||||
]
|
||||
finally:
|
||||
self._db.putconn(conn)
|
||||
86
app/db/protocol.py
Normal file
86
app/db/protocol.py
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
"""Protocol (duck-type interface) for shared table backends (SQLite and Postgres)."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Optional, Protocol, runtime_checkable
|
||||
|
||||
from app.db.models import MarketComp, ScammerEntry, Seller
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class SharedTableProtocol(Protocol):
|
||||
"""Protocol that both Store (SQLite) and SnipeSharedStore (Postgres) must satisfy.
|
||||
|
||||
This enables code that reads/writes shared tables (sellers, market_comps,
|
||||
reported_sellers, scammer_blocklist) to remain agnostic to the underlying backend.
|
||||
"""
|
||||
|
||||
def save_seller(self, seller: Seller) -> None:
|
||||
"""Persist a single seller record."""
|
||||
...
|
||||
|
||||
def save_sellers(self, sellers: list[Seller]) -> None:
|
||||
"""Persist multiple seller records (batch upsert)."""
|
||||
...
|
||||
|
||||
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
|
||||
"""Fetch a single seller by platform and platform_seller_id."""
|
||||
...
|
||||
|
||||
def save_market_comp(self, comp: MarketComp) -> None:
|
||||
"""Persist a market comparison record."""
|
||||
...
|
||||
|
||||
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
|
||||
"""Fetch a market comparison by platform and query_hash."""
|
||||
...
|
||||
|
||||
def mark_reported(
|
||||
self,
|
||||
platform: str,
|
||||
platform_seller_id: str,
|
||||
username: Optional[str] = None,
|
||||
reported_by: str = "user",
|
||||
) -> None:
|
||||
"""Record that a seller has been reported to the platform."""
|
||||
...
|
||||
|
||||
def list_reported(self, platform: str = "ebay") -> list[str]:
|
||||
"""Return all platform_seller_ids that have been reported."""
|
||||
...
|
||||
|
||||
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
|
||||
"""Permanently erase a seller and all related data (GDPR/eBay compliance)."""
|
||||
...
|
||||
|
||||
def refresh_seller_categories(
|
||||
self,
|
||||
platform: str,
|
||||
seller_ids: list[str],
|
||||
listing_store: Optional[Any] = None,
|
||||
) -> int:
|
||||
"""Derive category_history_json for sellers that lack it from stored listings.
|
||||
|
||||
listing_store: Store holding listings (may differ from self in split-DB mode).
|
||||
Returns count of sellers updated.
|
||||
"""
|
||||
...
|
||||
|
||||
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
|
||||
"""Return True if a seller is on the community scammer blocklist."""
|
||||
...
|
||||
|
||||
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
|
||||
"""Upsert a seller into the blocklist. Returns the saved entry with id and created_at."""
|
||||
...
|
||||
|
||||
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
|
||||
"""Remove a seller from the blocklist."""
|
||||
...
|
||||
|
||||
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
|
||||
"""Return all blocklisted sellers for a platform, newest first."""
|
||||
...
|
||||
|
||||
def clone(self) -> SharedTableProtocol:
|
||||
"""Create a new independent instance pointing to the same backend."""
|
||||
...
|
||||
|
|
@ -21,6 +21,10 @@ class Store:
|
|||
# WAL mode: allows concurrent readers + one writer without blocking
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
|
||||
def clone(self) -> Store:
|
||||
"""Create a new independent instance pointing to the same database."""
|
||||
return Store(self._db_path)
|
||||
|
||||
# --- Seller ---
|
||||
|
||||
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ Snipe LLMRouter shim — tri-level config path priority.
|
|||
Config lookup order:
|
||||
1. <repo>/config/llm.yaml — per-install local override
|
||||
2. ~/.config/circuitforge/llm.yaml — user-level config (circuitforge-core default)
|
||||
3. env-var auto-config (ANTHROPIC_API_KEY, OPENAI_API_KEY, OLLAMA_HOST, CF_ORCH_URL)
|
||||
3. env-var auto-config (ANTHROPIC_API_KEY, OPENAI_API_KEY, OLLAMA_HOST, GPU_SERVER_URL)
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ _SHOPPING_API_INTER_REQUEST_DELAY = 0.5 # seconds between successive calls
|
|||
_SELLER_ENRICH_TTL_HOURS = 24 # skip re-enrichment within this window
|
||||
|
||||
from app.db.models import Listing, MarketComp, Seller
|
||||
from app.db.store import Store
|
||||
from app.db.protocol import SharedTableProtocol
|
||||
from app.platforms import PlatformAdapter, SearchFilters
|
||||
from app.platforms.ebay.auth import EbayTokenManager
|
||||
from app.platforms.ebay.normaliser import normalise_listing, normalise_seller
|
||||
|
|
@ -67,7 +67,7 @@ BROWSE_BASE = {
|
|||
|
||||
|
||||
class EbayAdapter(PlatformAdapter):
|
||||
def __init__(self, token_manager: EbayTokenManager, shared_store: Store, env: str = "production"):
|
||||
def __init__(self, token_manager: EbayTokenManager, shared_store: SharedTableProtocol, env: str = "production"):
|
||||
self._tokens = token_manager
|
||||
self._store = shared_store
|
||||
self._env = env
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ log = logging.getLogger(__name__)
|
|||
from bs4 import BeautifulSoup
|
||||
|
||||
from app.db.models import Listing, MarketComp, Seller
|
||||
from app.db.store import Store
|
||||
from app.db.protocol import SharedTableProtocol
|
||||
from app.platforms import PlatformAdapter, SearchFilters
|
||||
|
||||
EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html"
|
||||
|
|
@ -286,7 +286,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
|
|||
category_history) cause TrustScorer to set score_is_partial=True.
|
||||
"""
|
||||
|
||||
def __init__(self, shared_store: Store, delay: float = 1.0):
|
||||
def __init__(self, shared_store: SharedTableProtocol, delay: float = 1.0):
|
||||
self._store = shared_store
|
||||
self._delay = delay
|
||||
|
||||
|
|
@ -374,8 +374,6 @@ class ScrapedEbayAdapter(PlatformAdapter):
|
|||
Does not raise — failures per-seller are silently skipped so the main
|
||||
search response is never blocked.
|
||||
"""
|
||||
db_path = self._store._db_path # capture for thread-local Store creation
|
||||
|
||||
def _enrich_one(item: tuple[str, str]) -> None:
|
||||
seller_id, listing_id = item
|
||||
try:
|
||||
|
|
@ -388,7 +386,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
|
|||
)
|
||||
if age_days is None and fb_count is None:
|
||||
return # nothing new to write
|
||||
thread_store = Store(db_path)
|
||||
thread_store = self._store.clone()
|
||||
seller = thread_store.get_seller("ebay", seller_id)
|
||||
if not seller:
|
||||
log.warning("BTF enrich: seller %s not found in DB", seller_id)
|
||||
|
|
|
|||
|
|
@ -8,9 +8,9 @@ Current task types:
|
|||
result to trust_scores.photo_analysis_json (Paid tier).
|
||||
|
||||
Image assessment routing:
|
||||
Cloud (CF_ORCH_URL set): allocates via cf-orch task endpoint
|
||||
Cloud (GPU_SERVER_URL set): allocates via cf-orch task endpoint
|
||||
product=snipe, task=image_assessment.
|
||||
Local (no CF_ORCH_URL) or TaskNotFound fallback: uses LLMRouter
|
||||
Local (no GPU_SERVER_URL) or TaskNotFound fallback: uses LLMRouter
|
||||
with a vision-capable local backend (moondream2, llava, etc.).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
|
@ -135,7 +135,7 @@ def _run_trust_photo_analysis(
|
|||
if listing_title:
|
||||
user_prompt = f"Assess this eBay listing image: {listing_title}"
|
||||
|
||||
cforch_url = os.getenv("CF_ORCH_URL")
|
||||
cforch_url = os.getenv("GPU_SERVER_URL") or os.getenv("CF_ORCH_URL")
|
||||
if cforch_url:
|
||||
raw = _assess_via_orch(cforch_url, image_data_url, user_prompt)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import hashlib
|
|||
import math
|
||||
|
||||
from app.db.models import Listing, TrustScore
|
||||
from app.db.store import Store
|
||||
from app.db.protocol import SharedTableProtocol
|
||||
|
||||
from .aggregator import Aggregator
|
||||
from .metadata import MetadataScorer
|
||||
|
|
@ -12,7 +12,7 @@ from .photo import PhotoScorer
|
|||
class TrustScorer:
|
||||
"""Orchestrates metadata + photo scoring for a batch of listings."""
|
||||
|
||||
def __init__(self, shared_store: Store):
|
||||
def __init__(self, shared_store: SharedTableProtocol):
|
||||
self._store = shared_store
|
||||
self._meta = MetadataScorer()
|
||||
self._photo = PhotoScorer()
|
||||
|
|
|
|||
|
|
@ -20,9 +20,12 @@ services:
|
|||
CLOUD_MODE: "true"
|
||||
CLOUD_DATA_ROOT: /devl/snipe-cloud-data
|
||||
# DIRECTUS_JWT_SECRET, HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN — set in .env (never commit)
|
||||
# CF_ORCH_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
|
||||
# GPU_SERVER_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
|
||||
# Override in .env to use a different coordinator URL.
|
||||
CF_ORCH_URL: "http://host.docker.internal:7700"
|
||||
GPU_SERVER_URL: "http://host.docker.internal:7700"
|
||||
# SNIPE_SHARED_DB_URL — Postgres DSN for shared tables (sellers, market_comps, blocklist).
|
||||
# Required for production multi-user deployments. Set in .env (never commit).
|
||||
# SNIPE_SHARED_DB_URL: "postgresql://snipe:<password>@postgres:5432/snipe_shared"
|
||||
CF_APP_NAME: snipe
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
|
|
|
|||
|
|
@ -18,8 +18,8 @@ services:
|
|||
environment:
|
||||
- RELOAD=true
|
||||
# Point the LLM/vision task scheduler at the local cf-orch coordinator.
|
||||
# Only has effect when CF_ORCH_URL is set (uncomment in .env, or set inline).
|
||||
# - CF_ORCH_URL=http://10.1.10.71:7700
|
||||
# Only has effect when GPU_SERVER_URL is set (uncomment in .env, or set inline).
|
||||
# - GPU_SERVER_URL=http://10.1.10.71:7700
|
||||
|
||||
# cf-orch agent — routes trust_photo_analysis vision tasks to the GPU coordinator.
|
||||
# Only starts when you pass --profile orch:
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
# (claude_code, copilot) are intentionally excluded here.
|
||||
#
|
||||
# CF Orchestrator routes both ollama and vllm allocations for VRAM-aware
|
||||
# scheduling. CF_ORCH_URL must be set in .env for allocations to resolve;
|
||||
# scheduling. GPU_SERVER_URL must be set in .env for allocations to resolve;
|
||||
# if cf-orch is unreachable the backend falls back to its static base_url.
|
||||
#
|
||||
# Model choice for query builder: llama3.1:8b
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ version = "0.3.0"
|
|||
description = "Auction listing monitor and trust scorer"
|
||||
requires-python = ">=3.11"
|
||||
dependencies = [
|
||||
"circuitforge-core>=0.8.0",
|
||||
"circuitforge-core[community]>=0.8.0",
|
||||
"streamlit>=1.32",
|
||||
"requests>=2.31",
|
||||
"imagehash>=4.3",
|
||||
|
|
@ -24,10 +24,15 @@ dependencies = [
|
|||
"cryptography>=42.0",
|
||||
"PyJWT>=2.8",
|
||||
"httpx>=0.27",
|
||||
"circuitforge-orch>=0.1.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
orchestration = [
|
||||
# Paid+ tier only — not published to PyPI. Install from source or Forgejo Packages.
|
||||
# pip install -e ../circuitforge-orch (dev)
|
||||
# pip install snipe[orchestration] (self-hosted Paid+)
|
||||
"circuitforge-orch>=0.1.0",
|
||||
]
|
||||
dev = [
|
||||
"pytest>=8.0",
|
||||
"pytest-cov>=5.0",
|
||||
|
|
|
|||
17
tests/conftest.py
Normal file
17
tests/conftest.py
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
import os
|
||||
import pytest
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
config.addinivalue_line(
|
||||
"markers",
|
||||
"postgres: mark test as requiring a live Postgres instance (SNIPE_SHARED_DB_URL must be set)",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def postgres_dsn():
|
||||
dsn = os.environ.get("SNIPE_SHARED_DB_URL")
|
||||
if not dsn:
|
||||
pytest.skip("SNIPE_SHARED_DB_URL not set — skipping Postgres tests")
|
||||
return dsn
|
||||
157
tests/db/test_pg_shared.py
Normal file
157
tests/db/test_pg_shared.py
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
"""Tests for SnipeSharedStore — requires live Postgres via SNIPE_SHARED_DB_URL."""
|
||||
import pytest
|
||||
from app.db.models import MarketComp, Seller
|
||||
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore
|
||||
from app.db.protocol import SharedTableProtocol
|
||||
|
||||
|
||||
@pytest.mark.postgres
|
||||
def test_snipe_shared_store_satisfies_protocol(postgres_dsn):
|
||||
assert issubclass(SnipeSharedStore, SharedTableProtocol)
|
||||
|
||||
|
||||
@pytest.mark.postgres
|
||||
def test_save_and_get_seller(postgres_dsn):
|
||||
db = SnipeSharedDB(postgres_dsn)
|
||||
db.run_migrations()
|
||||
store = SnipeSharedStore(db)
|
||||
|
||||
seller = Seller(
|
||||
platform="ebay",
|
||||
platform_seller_id="test-seller-001",
|
||||
username="testseller",
|
||||
account_age_days=365,
|
||||
feedback_count=100,
|
||||
feedback_ratio=0.99,
|
||||
category_history_json='{"electronics": 5}',
|
||||
)
|
||||
store.save_seller(seller)
|
||||
|
||||
result = store.get_seller("ebay", "test-seller-001")
|
||||
assert result is not None
|
||||
assert result.username == "testseller"
|
||||
assert result.feedback_count == 100
|
||||
|
||||
store.delete_seller_data("ebay", "test-seller-001")
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.mark.postgres
|
||||
def test_save_sellers_coalesce_preserves_age(postgres_dsn):
|
||||
db = SnipeSharedDB(postgres_dsn)
|
||||
db.run_migrations()
|
||||
store = SnipeSharedStore(db)
|
||||
|
||||
seller_with_age = Seller(
|
||||
platform="ebay", platform_seller_id="coalesce-test",
|
||||
username="u", account_age_days=730,
|
||||
feedback_count=50, feedback_ratio=0.95, category_history_json="{}",
|
||||
)
|
||||
store.save_seller(seller_with_age)
|
||||
|
||||
seller_without_age = Seller(
|
||||
platform="ebay", platform_seller_id="coalesce-test",
|
||||
username="u", account_age_days=None,
|
||||
feedback_count=60, feedback_ratio=0.96, category_history_json="{}",
|
||||
)
|
||||
store.save_sellers([seller_without_age])
|
||||
|
||||
result = store.get_seller("ebay", "coalesce-test")
|
||||
assert result.account_age_days == 730
|
||||
assert result.feedback_count == 60
|
||||
|
||||
store.delete_seller_data("ebay", "coalesce-test")
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.mark.postgres
|
||||
def test_market_comp_cache(postgres_dsn):
|
||||
from datetime import datetime, timedelta, timezone
|
||||
db = SnipeSharedDB(postgres_dsn)
|
||||
db.run_migrations()
|
||||
store = SnipeSharedStore(db)
|
||||
|
||||
expires = (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat()
|
||||
comp = MarketComp(
|
||||
platform="ebay", query_hash="abc123",
|
||||
median_price=49.99, sample_count=10, expires_at=expires,
|
||||
)
|
||||
store.save_market_comp(comp)
|
||||
|
||||
result = store.get_market_comp("ebay", "abc123")
|
||||
assert result is not None
|
||||
assert result.median_price == 49.99
|
||||
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.mark.postgres
|
||||
def test_reported_sellers(postgres_dsn):
|
||||
db = SnipeSharedDB(postgres_dsn)
|
||||
db.run_migrations()
|
||||
store = SnipeSharedStore(db)
|
||||
|
||||
store.mark_reported("ebay", "bad-seller-99", username="badguy")
|
||||
reported = store.list_reported("ebay")
|
||||
assert "bad-seller-99" in reported
|
||||
|
||||
store.mark_reported("ebay", "bad-seller-99") # idempotent
|
||||
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.mark.postgres
|
||||
def test_clone_returns_self(postgres_dsn):
|
||||
db = SnipeSharedDB(postgres_dsn)
|
||||
store = SnipeSharedStore(db)
|
||||
assert store.clone() is store
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.mark.postgres
|
||||
def test_blocklist_add_get_remove(postgres_dsn):
|
||||
from app.db.models import ScammerEntry
|
||||
db = SnipeSharedDB(postgres_dsn)
|
||||
db.run_migrations()
|
||||
store = SnipeSharedStore(db)
|
||||
|
||||
assert not store.is_blocklisted("ebay", "bad-999")
|
||||
|
||||
entry = store.add_to_blocklist(ScammerEntry(
|
||||
platform="ebay", platform_seller_id="bad-999",
|
||||
username="scammer1", reason="sold fakes", source="manual",
|
||||
))
|
||||
assert entry.id is not None
|
||||
assert store.is_blocklisted("ebay", "bad-999")
|
||||
|
||||
entries = store.list_blocklist("ebay")
|
||||
assert any(e.platform_seller_id == "bad-999" for e in entries)
|
||||
|
||||
store.remove_from_blocklist("ebay", "bad-999")
|
||||
assert not store.is_blocklisted("ebay", "bad-999")
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.mark.postgres
|
||||
def test_blocklist_upsert_is_idempotent(postgres_dsn):
|
||||
from app.db.models import ScammerEntry
|
||||
db = SnipeSharedDB(postgres_dsn)
|
||||
db.run_migrations()
|
||||
store = SnipeSharedStore(db)
|
||||
|
||||
store.add_to_blocklist(ScammerEntry(
|
||||
platform="ebay", platform_seller_id="dup-test",
|
||||
username="seller", reason="reason1", source="manual",
|
||||
))
|
||||
# Second add — should not raise, should update username but preserve reason via COALESCE
|
||||
store.add_to_blocklist(ScammerEntry(
|
||||
platform="ebay", platform_seller_id="dup-test",
|
||||
username="seller_updated", reason=None, source="community",
|
||||
))
|
||||
entries = [e for e in store.list_blocklist("ebay") if e.platform_seller_id == "dup-test"]
|
||||
assert len(entries) == 1
|
||||
assert entries[0].username == "seller_updated"
|
||||
assert entries[0].reason == "reason1" # COALESCE preserved original reason
|
||||
|
||||
store.remove_from_blocklist("ebay", "dup-test")
|
||||
db.close()
|
||||
39
tests/db/test_protocol.py
Normal file
39
tests/db/test_protocol.py
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
"""Verify Store satisfies SharedTableProtocol at import time."""
|
||||
from app.db.protocol import SharedTableProtocol
|
||||
from app.db.store import Store
|
||||
|
||||
|
||||
def test_store_satisfies_protocol():
|
||||
assert issubclass(Store, SharedTableProtocol)
|
||||
|
||||
|
||||
def test_store_clone_returns_new_instance(tmp_path):
|
||||
db = tmp_path / "test.db"
|
||||
s = Store(db)
|
||||
clone = s.clone()
|
||||
assert isinstance(clone, Store)
|
||||
assert clone is not s
|
||||
assert clone._db_path == db
|
||||
|
||||
|
||||
def test_ebay_adapter_accepts_protocol():
|
||||
from app.platforms.ebay.adapter import EbayAdapter
|
||||
import tempfile
|
||||
import pathlib
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
s = Store(pathlib.Path(tmp) / "t.db")
|
||||
adapter = EbayAdapter(token_manager=MagicMock(), shared_store=s)
|
||||
assert adapter._store is s
|
||||
|
||||
|
||||
def test_scraped_adapter_no_db_path_ref():
|
||||
from app.platforms.ebay.scraper import ScrapedEbayAdapter
|
||||
import tempfile
|
||||
import pathlib
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
s = Store(pathlib.Path(tmp) / "t.db")
|
||||
adapter = ScrapedEbayAdapter(shared_store=s)
|
||||
assert not hasattr(adapter, '_db_path_ref')
|
||||
|
|
@ -173,7 +173,7 @@ def _make_orch_client_mock(vision_json: str) -> MagicMock:
|
|||
|
||||
|
||||
def test_run_task_photo_analysis_orch_success(tmp_db: Path):
|
||||
"""Cloud path: CFOrchClient.task_allocate is used when CF_ORCH_URL is set."""
|
||||
"""Cloud path: CFOrchClient.task_allocate is used when GPU_SERVER_URL is set."""
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
|
||||
|
||||
chat_resp = MagicMock()
|
||||
|
|
@ -181,7 +181,7 @@ def test_run_task_photo_analysis_orch_success(tmp_db: Path):
|
|||
chat_resp.raise_for_status = MagicMock()
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
|
||||
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
|
||||
patch("app.tasks.runner.httpx") as mock_httpx, \
|
||||
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
|
||||
|
||||
|
|
@ -216,7 +216,7 @@ def test_run_task_photo_analysis_orch_uses_image_assessment_task(tmp_db: Path):
|
|||
chat_resp.raise_for_status = MagicMock()
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
|
||||
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
|
||||
patch("app.tasks.runner.httpx") as mock_httpx, \
|
||||
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
|
||||
|
||||
|
|
@ -248,7 +248,7 @@ def test_run_task_photo_analysis_orch_sends_image_url_content(tmp_db: Path):
|
|||
return resp
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
|
||||
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
|
||||
patch("app.tasks.runner.httpx") as mock_httpx, \
|
||||
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
|
||||
|
||||
|
|
@ -282,7 +282,7 @@ def test_run_task_photo_analysis_orch_task_not_found_falls_back(tmp_db: Path):
|
|||
client_instance.task_allocate.return_value = cm
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
|
||||
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
|
||||
patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \
|
||||
patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local:
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue