Compare commits

..

No commits in common. "9bd6d9513e73793e103393bb5b09b905a5be835e" and "4dd44fdafb0f50eba4718d5ba87a58e13b47f9b5" have entirely different histories.

22 changed files with 53 additions and 832 deletions

View file

@ -98,25 +98,16 @@ CF_APP_NAME=snipe
# OLLAMA_HOST=http://localhost:11434 # OLLAMA_HOST=http://localhost:11434
# OLLAMA_MODEL=llava:7b # OLLAMA_MODEL=llava:7b
# GPU Server — routes vision/LLM tasks to a cf-orch coordinator for VRAM management. # CF Orchestrator — routes vision/LLM tasks to a cf-orch coordinator for VRAM management.
# Self-hosted: point at a local cf-orch coordinator if you have one running. # Self-hosted: point at a local cf-orch coordinator if you have one running.
# Cloud (internal): managed coordinator at orch.circuitforge.tech. # Cloud (internal): managed coordinator at orch.circuitforge.tech.
# Leave unset to run vision tasks inline (no VRAM coordination). # Leave unset to run vision tasks inline (no VRAM coordination).
# GPU_SERVER_URL=http://10.1.10.71:7700 # CF_ORCH_URL=http://10.1.10.71:7700
#
# CF_ORCH_URL is accepted as a backward-compat alias for GPU_SERVER_URL.
# #
# cf-orch agent (compose --profile orch) — coordinator URL for the sidecar agent. # cf-orch agent (compose --profile orch) — coordinator URL for the sidecar agent.
# Defaults to GPU_SERVER_URL if unset. # Defaults to CF_ORCH_URL if unset.
# CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700 # CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700
# ── Shared Postgres (optional — strongly recommended for cloud/multi-user) ────
# When set, sellers, market_comps, reported_sellers, and scammer_blocklist are
# stored in Postgres instead of SQLite. Required to avoid database-locked errors
# under concurrent load (>10 simultaneous search users).
# Cloud instances: set to the cf-postgres DSN. Self-hosted: leave unset for SQLite.
# SNIPE_SHARED_DB_URL=postgresql://snipe:<password>@localhost:5432/snipe_shared
# ── Community DB (optional) ────────────────────────────────────────────────── # ── Community DB (optional) ──────────────────────────────────────────────────
# When set, seller trust signals (confirmed scammers added to blocklist) are # When set, seller trust signals (confirmed scammers added to blocklist) are
# published to the shared community PostgreSQL for cross-user signal aggregation. # published to the shared community PostgreSQL for cross-user signal aggregation.

View file

@ -5,7 +5,6 @@ WORKDIR /app
# System deps for Playwright/Chromium # System deps for Playwright/Chromium
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
xvfb \ xvfb \
libpq-dev \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install circuitforge-core from sibling directory (compose sets context: ..) # Install circuitforge-core from sibling directory (compose sets context: ..)

View file

@ -174,8 +174,6 @@ Snipe uses a dual license:
| Discovery pipeline — scrapers, platform adapters, search, keyword filtering | [MIT](LICENSE-MIT) | | Discovery pipeline — scrapers, platform adapters, search, keyword filtering | [MIT](LICENSE-MIT) |
| LLM trust-scoring, query builder, vision assessment, AI features | [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use requires a paid license; converts to MIT after 4 years | | LLM trust-scoring, query builder, vision assessment, AI features | [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use requires a paid license; converts to MIT after 4 years |
Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)
Privacy · Safety · Accessibility — co-equal, non-negotiable. Privacy · Safety · Accessibility — co-equal, non-negotiable.
[circuitforge.tech](https://circuitforge.tech) [circuitforge.tech](https://circuitforge.tech)

View file

@ -33,7 +33,6 @@ from api.cloud_session import CloudUser, compute_features, get_session
from api.ebay_webhook import router as ebay_webhook_router from api.ebay_webhook import router as ebay_webhook_router
from app.db.models import SavedSearch as SavedSearchModel from app.db.models import SavedSearch as SavedSearchModel
from app.db.models import ScammerEntry from app.db.models import ScammerEntry
from app.db.protocol import SharedTableProtocol
from app.db.store import Store from app.db.store import Store
from app.platforms import SUPPORTED_PLATFORMS, SearchFilters from app.platforms import SUPPORTED_PLATFORMS, SearchFilters
from app.platforms.ebay.adapter import EbayAdapter from app.platforms.ebay.adapter import EbayAdapter
@ -143,19 +142,6 @@ def _get_community_store() -> "SnipeCommunityStore | None":
return _community_store return _community_store
# ── Shared Postgres backend (optional — active when SNIPE_SHARED_DB_URL is set) ─
# Replaces the SQLite shared.db for sellers, market_comps, reported_sellers, and
# scammer_blocklist. ThreadedConnectionPool is thread-safe; one instance per process.
_pg_shared_store: "SharedTableProtocol | None" = None
def _make_shared_store(path: Path) -> SharedTableProtocol:
"""Return the active shared backend — Postgres if configured, SQLite otherwise."""
if _pg_shared_store is not None:
return _pg_shared_store
return Store(path)
# ── LLM Query Builder singletons (optional — requires LLM backend) ──────────── # ── LLM Query Builder singletons (optional — requires LLM backend) ────────────
_category_cache = None _category_cache = None
_query_translator = None _query_translator = None
@ -167,7 +153,7 @@ def _get_query_translator():
@asynccontextmanager @asynccontextmanager
async def _lifespan(app: FastAPI): async def _lifespan(app: FastAPI):
global _community_store, _pg_shared_store global _community_store
# Pre-warm the Chromium browser pool so the first scrape request does not # Pre-warm the Chromium browser pool so the first scrape request does not
# pay the full cold-start cost (5-10s Xvfb + browser launch). # pay the full cold-start cost (5-10s Xvfb + browser launch).
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2). # Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
@ -192,21 +178,6 @@ async def _lifespan(app: FastAPI):
get_scheduler(sched_db) get_scheduler(sched_db)
log.info("Snipe task scheduler started (db=%s)", sched_db) log.info("Snipe task scheduler started (db=%s)", sched_db)
# Shared Postgres backend — optional. Replaces SQLite for sellers, market_comps,
# reported_sellers, and scammer_blocklist under concurrent load.
snipe_shared_dsn = os.environ.get("SNIPE_SHARED_DB_URL", "")
if snipe_shared_dsn:
try:
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore as _SnipeSharedStore
_pg_db = SnipeSharedDB(snipe_shared_dsn)
_pg_db.run_migrations()
_pg_shared_store = _SnipeSharedStore(_pg_db)
log.info("Shared Postgres backend ready (sellers, market_comps, blocklist)")
except Exception:
log.exception(
"SNIPE_SHARED_DB_URL set but Postgres init failed — falling back to SQLite"
)
# Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset. # Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset.
community_db_url = os.environ.get("COMMUNITY_DB_URL", "") community_db_url = os.environ.get("COMMUNITY_DB_URL", "")
if community_db_url: if community_db_url:
@ -238,7 +209,7 @@ async def _lifespan(app: FastAPI):
_category_cache.refresh(token_manager=None) # bootstrap fallback _category_cache.refresh(token_manager=None) # bootstrap fallback
try: try:
cforch_url = os.getenv("GPU_SERVER_URL") or os.getenv("CF_ORCH_URL") or None cforch_url = os.getenv("CF_ORCH_URL") or None
if cforch_url: if cforch_url:
_query_translator = QueryTranslator( _query_translator = QueryTranslator(
category_cache=_category_cache, category_cache=_category_cache,
@ -475,7 +446,7 @@ def session_info(response: Response, session: CloudUser = Depends(get_session)):
def _trigger_scraper_enrichment( def _trigger_scraper_enrichment(
listings: list, listings: list,
shared_store: SharedTableProtocol, shared_store: Store,
shared_db: Path, shared_db: Path,
user_db: Path | None = None, user_db: Path | None = None,
query: str = "", query: str = "",
@ -541,7 +512,7 @@ def _trigger_scraper_enrichment(
if not session_id or session_id not in _update_queues: if not session_id or session_id not in _update_queues:
return return
q = _update_queues[session_id] q = _update_queues[session_id]
thread_shared = shared_store.clone() thread_shared = Store(shared_db)
thread_user = Store(user_db or shared_db) thread_user = Store(user_db or shared_db)
scorer = TrustScorer(thread_shared) scorer = TrustScorer(thread_shared)
comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest()) comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest())
@ -567,7 +538,7 @@ def _trigger_scraper_enrichment(
def _run(): def _run():
try: try:
enricher = ScrapedEbayAdapter(shared_store.clone()) enricher = ScrapedEbayAdapter(Store(shared_db))
if needs_btf: if needs_btf:
enricher.enrich_sellers_btf(needs_btf, max_workers=2) enricher.enrich_sellers_btf(needs_btf, max_workers=2)
log.info("BTF enrichment complete for %d sellers", len(needs_btf)) log.info("BTF enrichment complete for %d sellers", len(needs_btf))
@ -841,7 +812,7 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue() _update_queues[session_id] = _queue.SimpleQueue()
try: try:
shared_store = _make_shared_store(shared_db) shared_store = Store(shared_db)
user_store = Store(user_db) user_store = Store(user_db)
# Re-hydrate Listing dataclass instances from the cached dicts so the # Re-hydrate Listing dataclass instances from the cached dicts so the
@ -926,14 +897,13 @@ def search(
_evict_expired_cache() _evict_expired_cache()
log.info("cache: miss key=%s q=%r", cache_key, q) log.info("cache: miss key=%s q=%r", cache_key, q)
# Each thread creates its own store via clone() — sqlite3 check_same_thread=True; # Each thread creates its own Store — sqlite3 check_same_thread=True.
# SnipeSharedStore.clone() returns self (ThreadedConnectionPool is thread-safe).
def _run_search(ebay_query: str) -> list: def _run_search(ebay_query: str) -> list:
return _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).search(ebay_query, base_filters) return _make_adapter(Store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None: def _run_comps() -> None:
try: try:
_make_adapter(_make_shared_store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) _make_adapter(Store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception: except Exception:
log.warning("comps: unhandled exception for %r", comp_query, exc_info=True) log.warning("comps: unhandled exception for %r", comp_query, exc_info=True)
@ -974,9 +944,10 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue() _update_queues[session_id] = _queue.SimpleQueue()
try: try:
# Main-thread stores — shared_store may be Postgres (sellers, market_comps); # Main-thread stores — fresh connections, same thread.
# user_store is always per-user SQLite (listings, trust_scores, saved_searches). # shared_store: sellers, market_comps (all users share this data)
shared_store = _make_shared_store(shared_db) # user_store: listings, saved_searches (per-user in cloud mode, same file in local mode)
shared_store = Store(shared_db)
user_store = Store(user_db) user_store = Store(user_db)
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1236,7 +1207,7 @@ def search_async(
cached_listings_raw = payload["listings"] cached_listings_raw = payload["listings"]
cached_market_price = payload["market_price"] cached_market_price = payload["market_price"]
try: try:
shared_store = _make_shared_store(_shared_db) shared_store = Store(_shared_db)
user_store = Store(_user_db) user_store = Store(_user_db)
listings = [_Listing(**d) for d in cached_listings_raw] listings = [_Listing(**d) for d in cached_listings_raw]
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1316,11 +1287,11 @@ def search_async(
try: try:
def _run_search(ebay_query: str) -> list: def _run_search(ebay_query: str) -> list:
return _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters) return _make_adapter(Store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None: def _run_comps() -> None:
try: try:
_make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) _make_adapter(Store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception: except Exception:
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True) log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
@ -1343,7 +1314,7 @@ def search_async(
platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm, platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm,
) )
shared_store = _make_shared_store(_shared_db) shared_store = Store(_shared_db)
user_store = Store(_user_db) user_store = Store(_user_db)
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1502,7 +1473,7 @@ def enrich_seller(
""" """
import threading import threading
shared_store = _make_shared_store(session.shared_db) shared_store = Store(session.shared_db)
user_store = Store(session.user_db) user_store = Store(session.user_db)
shared_db = session.shared_db shared_db = session.shared_db
@ -1531,7 +1502,7 @@ def enrich_seller(
def _btf(): def _btf():
try: try:
ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_btf( ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_btf(
{seller: listing_id}, max_workers=1 {seller: listing_id}, max_workers=1
) )
except Exception as e: except Exception as e:
@ -1539,7 +1510,7 @@ def enrich_seller(
def _ssn(): def _ssn():
try: try:
ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_categories( ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_categories(
[seller], max_workers=1 [seller], max_workers=1
) )
except Exception as e: except Exception as e:
@ -1810,7 +1781,7 @@ class BlocklistAdd(BaseModel):
@app.get("/api/blocklist") @app.get("/api/blocklist")
def list_blocklist(session: CloudUser = Depends(get_session)): def list_blocklist(session: CloudUser = Depends(get_session)):
store = _make_shared_store(session.shared_db) store = Store(session.shared_db)
return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]} return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]}
@ -1821,7 +1792,7 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
status_code=403, status_code=403,
detail="Sign in to report sellers to the community blocklist.", detail="Sign in to report sellers to the community blocklist.",
) )
store = _make_shared_store(session.shared_db) store = Store(session.shared_db)
entry = store.add_to_blocklist(ScammerEntry( entry = store.add_to_blocklist(ScammerEntry(
platform=body.platform, platform=body.platform,
platform_seller_id=body.platform_seller_id, platform_seller_id=body.platform_seller_id,
@ -1855,13 +1826,13 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
@app.delete("/api/blocklist/{platform_seller_id}", status_code=204) @app.delete("/api/blocklist/{platform_seller_id}", status_code=204)
def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)): def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)):
_make_shared_store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id) Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
@app.get("/api/blocklist/export") @app.get("/api/blocklist/export")
def export_blocklist(session: CloudUser = Depends(get_session)): def export_blocklist(session: CloudUser = Depends(get_session)):
"""Download the blocklist as a CSV file.""" """Download the blocklist as a CSV file."""
entries = _make_shared_store(session.shared_db).list_blocklist() entries = Store(session.shared_db).list_blocklist()
buf = io.StringIO() buf = io.StringIO()
writer = csv.writer(buf) writer = csv.writer(buf)
writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"]) writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"])
@ -1893,7 +1864,7 @@ async def import_blocklist(
except UnicodeDecodeError: except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File must be UTF-8 encoded") raise HTTPException(status_code=400, detail="File must be UTF-8 encoded")
store = _make_shared_store(session.shared_db) store = Store(session.shared_db)
imported = 0 imported = 0
errors: list[str] = [] errors: list[str] = []
reader = csv.DictReader(io.StringIO(text)) reader = csv.DictReader(io.StringIO(text))

View file

@ -1,49 +0,0 @@
-- Snipe shared tables: sellers, market_comps, reported_sellers
-- Replaces the equivalent tables in shared.db (SQLite).
-- Per-user tables (listings, trust_scores, saved_searches) remain in SQLite.
CREATE TABLE IF NOT EXISTS sellers (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
account_age_days INTEGER,
feedback_count INTEGER NOT NULL DEFAULT 0,
feedback_ratio DOUBLE PRECISION NOT NULL DEFAULT 0,
category_history_json TEXT NOT NULL DEFAULT '{}',
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS market_comps (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
query_hash TEXT NOT NULL,
median_price DOUBLE PRECISION NOT NULL,
sample_count INTEGER NOT NULL,
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
UNIQUE (platform, query_hash)
);
CREATE TABLE IF NOT EXISTS reported_sellers (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT,
reported_by TEXT NOT NULL DEFAULT 'user',
reported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS scammer_blocklist (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
reason TEXT,
source TEXT NOT NULL DEFAULT 'manual',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);

View file

@ -1,380 +0,0 @@
from __future__ import annotations
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from app.db.models import MarketComp, ScammerEntry, Seller
log = logging.getLogger(__name__)
_MIN_CONN = 2
_MAX_CONN = 20
class SnipeSharedDB:
"""Thread-safe Postgres connection pool for Snipe shared tables."""
def __init__(self, dsn: str) -> None:
self._pool = ThreadedConnectionPool(_MIN_CONN, _MAX_CONN, dsn=dsn)
def getconn(self):
return self._pool.getconn()
def putconn(self, conn) -> None:
self._pool.putconn(conn)
def close(self) -> None:
self._pool.closeall()
def run_migrations(self) -> None:
"""Apply pg_migrations/*.sql in filename order. Idempotent."""
migrations_dir = Path(__file__).parent / "pg_migrations"
files = sorted(migrations_dir.glob("*.sql"), key=lambda p: p.name)
conn = self.getconn()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS _snipe_shared_migrations (
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
conn.commit()
for f in files:
cur.execute(
"SELECT 1 FROM _snipe_shared_migrations WHERE filename = %s",
(f.name,),
)
if cur.fetchone():
continue
log.info("Applying migration: %s", f.name)
cur.execute(f.read_text())
cur.execute(
"INSERT INTO _snipe_shared_migrations (filename) VALUES (%s)",
(f.name,),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self.putconn(conn)
class SnipeSharedStore:
"""Postgres-backed store for sellers, market_comps, and reported_sellers.
Satisfies SharedTableProtocol. clone() returns self ThreadedConnectionPool
is already thread-safe, so no new instance is needed per thread.
"""
def __init__(self, db: SnipeSharedDB) -> None:
self._db = db
def clone(self) -> "SnipeSharedStore":
return self
# Sellers
def save_seller(self, seller: Seller) -> None:
self.save_sellers([seller])
def save_sellers(self, sellers: list[Seller]) -> None:
if not sellers:
return
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.executemany(
"""
INSERT INTO sellers
(platform, platform_seller_id, username, account_age_days,
feedback_count, feedback_ratio, category_history_json)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
feedback_count = EXCLUDED.feedback_count,
feedback_ratio = EXCLUDED.feedback_ratio,
account_age_days = COALESCE(
EXCLUDED.account_age_days,
sellers.account_age_days
),
category_history_json = COALESCE(
NULLIF(NULLIF(EXCLUDED.category_history_json, '{}'), ''),
NULLIF(NULLIF(sellers.category_history_json, '{}'), ''),
'{}'
),
fetched_at = NOW()
""",
[
(s.platform, s.platform_seller_id, s.username, s.account_age_days,
s.feedback_count, s.feedback_ratio, s.category_history_json or "{}")
for s in sellers
],
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, account_age_days,
feedback_count, feedback_ratio, category_history_json,
id, fetched_at
FROM sellers
WHERE platform = %s AND platform_seller_id = %s
""",
(platform, platform_seller_id),
)
row = cur.fetchone()
if not row:
return None
return Seller(*row[:7], id=row[7], fetched_at=str(row[8]))
finally:
self._db.putconn(conn)
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM sellers WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
# MarketComps
def save_market_comp(self, comp: MarketComp) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO market_comps
(platform, query_hash, median_price, sample_count, expires_at)
VALUES (%s, %s, %s, %s, %s::TIMESTAMPTZ)
ON CONFLICT (platform, query_hash) DO UPDATE SET
median_price = EXCLUDED.median_price,
sample_count = EXCLUDED.sample_count,
expires_at = EXCLUDED.expires_at,
fetched_at = NOW()
""",
(comp.platform, comp.query_hash, comp.median_price,
comp.sample_count, comp.expires_at),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
now = datetime.now(timezone.utc).isoformat()
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, query_hash, median_price, sample_count,
expires_at, id, fetched_at
FROM market_comps
WHERE platform = %s AND query_hash = %s AND expires_at > %s::TIMESTAMPTZ
""",
(platform, query_hash, now),
)
row = cur.fetchone()
if not row:
return None
return MarketComp(*row[:5], id=row[5], fetched_at=str(row[6]))
finally:
self._db.putconn(conn)
# Reported Sellers
def mark_reported(
self,
platform: str,
platform_seller_id: str,
username: Optional[str] = None,
reported_by: str = "user",
) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reported_sellers
(platform, platform_seller_id, username, reported_by)
VALUES (%s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO NOTHING
""",
(platform, platform_seller_id, username, reported_by),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_reported(self, platform: str = "ebay") -> list[str]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT platform_seller_id FROM reported_sellers WHERE platform = %s",
(platform,),
)
return [row[0] for row in cur.fetchall()]
finally:
self._db.putconn(conn)
# Seller Category Refresh
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store=None, # always a SQLite Store in practice
) -> int:
"""Derive category_history_json from listing data and update sellers in Postgres.
listing_store must be provided (it's always the per-user SQLite Store).
Returns count of sellers updated.
"""
from app.platforms.ebay.scraper import _classify_category_label # lazy to avoid circular
import json
if not seller_ids or listing_store is None:
return 0
updated = 0
for sid in seller_ids:
seller = self.get_seller(platform, sid)
if not seller or seller.category_history_json not in ("{}", "", None):
continue
# listing_store is always a SQLite Store; access _conn directly for the query.
rows = listing_store._conn.execute(
"SELECT category_name, COUNT(*) FROM listings "
"WHERE platform=? AND seller_platform_id=? AND category_name IS NOT NULL "
"GROUP BY category_name",
(platform, sid),
).fetchall()
if not rows:
continue
counts: dict[str, int] = {}
for cat_name, cnt in rows:
key = _classify_category_label(cat_name)
if key:
counts[key] = counts.get(key, 0) + cnt
if counts:
from dataclasses import replace
self.save_sellers([replace(seller, category_history_json=json.dumps(counts))])
updated += 1
return updated
# Scammer Blocklist
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s LIMIT 1",
(platform, platform_seller_id),
)
return cur.fetchone() is not None
finally:
self._db.putconn(conn)
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO scammer_blocklist
(platform, platform_seller_id, username, reason, source)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
reason = COALESCE(EXCLUDED.reason, scammer_blocklist.reason),
source = EXCLUDED.source
""",
(entry.platform, entry.platform_seller_id, entry.username,
entry.reason, entry.source),
)
conn.commit()
cur.execute(
"SELECT id, created_at FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(entry.platform, entry.platform_seller_id),
)
row = cur.fetchone()
from dataclasses import replace
return replace(entry, id=row[0], created_at=str(row[1]))
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, reason, source, id, created_at
FROM scammer_blocklist
WHERE platform = %s
ORDER BY created_at DESC
""",
(platform,),
)
return [
ScammerEntry(
platform=r[0], platform_seller_id=r[1], username=r[2],
reason=r[3], source=r[4], id=r[5], created_at=str(r[6]),
)
for r in cur.fetchall()
]
finally:
self._db.putconn(conn)

View file

@ -1,86 +0,0 @@
"""Protocol (duck-type interface) for shared table backends (SQLite and Postgres)."""
from __future__ import annotations
from typing import Any, Optional, Protocol, runtime_checkable
from app.db.models import MarketComp, ScammerEntry, Seller
@runtime_checkable
class SharedTableProtocol(Protocol):
"""Protocol that both Store (SQLite) and SnipeSharedStore (Postgres) must satisfy.
This enables code that reads/writes shared tables (sellers, market_comps,
reported_sellers, scammer_blocklist) to remain agnostic to the underlying backend.
"""
def save_seller(self, seller: Seller) -> None:
"""Persist a single seller record."""
...
def save_sellers(self, sellers: list[Seller]) -> None:
"""Persist multiple seller records (batch upsert)."""
...
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
"""Fetch a single seller by platform and platform_seller_id."""
...
def save_market_comp(self, comp: MarketComp) -> None:
"""Persist a market comparison record."""
...
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
"""Fetch a market comparison by platform and query_hash."""
...
def mark_reported(
self,
platform: str,
platform_seller_id: str,
username: Optional[str] = None,
reported_by: str = "user",
) -> None:
"""Record that a seller has been reported to the platform."""
...
def list_reported(self, platform: str = "ebay") -> list[str]:
"""Return all platform_seller_ids that have been reported."""
...
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
"""Permanently erase a seller and all related data (GDPR/eBay compliance)."""
...
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store: Optional[Any] = None,
) -> int:
"""Derive category_history_json for sellers that lack it from stored listings.
listing_store: Store holding listings (may differ from self in split-DB mode).
Returns count of sellers updated.
"""
...
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
"""Return True if a seller is on the community scammer blocklist."""
...
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
"""Upsert a seller into the blocklist. Returns the saved entry with id and created_at."""
...
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
"""Remove a seller from the blocklist."""
...
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
"""Return all blocklisted sellers for a platform, newest first."""
...
def clone(self) -> SharedTableProtocol:
"""Create a new independent instance pointing to the same backend."""
...

View file

@ -21,10 +21,6 @@ class Store:
# WAL mode: allows concurrent readers + one writer without blocking # WAL mode: allows concurrent readers + one writer without blocking
self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA journal_mode=WAL")
def clone(self) -> Store:
"""Create a new independent instance pointing to the same database."""
return Store(self._db_path)
# --- Seller --- # --- Seller ---
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None: def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:

View file

@ -6,7 +6,7 @@ Snipe LLMRouter shim — tri-level config path priority.
Config lookup order: Config lookup order:
1. <repo>/config/llm.yaml per-install local override 1. <repo>/config/llm.yaml per-install local override
2. ~/.config/circuitforge/llm.yaml user-level config (circuitforge-core default) 2. ~/.config/circuitforge/llm.yaml user-level config (circuitforge-core default)
3. env-var auto-config (ANTHROPIC_API_KEY, OPENAI_API_KEY, OLLAMA_HOST, GPU_SERVER_URL) 3. env-var auto-config (ANTHROPIC_API_KEY, OPENAI_API_KEY, OLLAMA_HOST, CF_ORCH_URL)
""" """
from pathlib import Path from pathlib import Path

View file

@ -22,7 +22,7 @@ _SHOPPING_API_INTER_REQUEST_DELAY = 0.5 # seconds between successive calls
_SELLER_ENRICH_TTL_HOURS = 24 # skip re-enrichment within this window _SELLER_ENRICH_TTL_HOURS = 24 # skip re-enrichment within this window
from app.db.models import Listing, MarketComp, Seller from app.db.models import Listing, MarketComp, Seller
from app.db.protocol import SharedTableProtocol from app.db.store import Store
from app.platforms import PlatformAdapter, SearchFilters from app.platforms import PlatformAdapter, SearchFilters
from app.platforms.ebay.auth import EbayTokenManager from app.platforms.ebay.auth import EbayTokenManager
from app.platforms.ebay.normaliser import normalise_listing, normalise_seller from app.platforms.ebay.normaliser import normalise_listing, normalise_seller
@ -67,7 +67,7 @@ BROWSE_BASE = {
class EbayAdapter(PlatformAdapter): class EbayAdapter(PlatformAdapter):
def __init__(self, token_manager: EbayTokenManager, shared_store: SharedTableProtocol, env: str = "production"): def __init__(self, token_manager: EbayTokenManager, shared_store: Store, env: str = "production"):
self._tokens = token_manager self._tokens = token_manager
self._store = shared_store self._store = shared_store
self._env = env self._env = env

View file

@ -25,7 +25,7 @@ log = logging.getLogger(__name__)
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from app.db.models import Listing, MarketComp, Seller from app.db.models import Listing, MarketComp, Seller
from app.db.protocol import SharedTableProtocol from app.db.store import Store
from app.platforms import PlatformAdapter, SearchFilters from app.platforms import PlatformAdapter, SearchFilters
EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html" EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html"
@ -286,7 +286,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
category_history) cause TrustScorer to set score_is_partial=True. category_history) cause TrustScorer to set score_is_partial=True.
""" """
def __init__(self, shared_store: SharedTableProtocol, delay: float = 1.0): def __init__(self, shared_store: Store, delay: float = 1.0):
self._store = shared_store self._store = shared_store
self._delay = delay self._delay = delay
@ -374,6 +374,8 @@ class ScrapedEbayAdapter(PlatformAdapter):
Does not raise failures per-seller are silently skipped so the main Does not raise failures per-seller are silently skipped so the main
search response is never blocked. search response is never blocked.
""" """
db_path = self._store._db_path # capture for thread-local Store creation
def _enrich_one(item: tuple[str, str]) -> None: def _enrich_one(item: tuple[str, str]) -> None:
seller_id, listing_id = item seller_id, listing_id = item
try: try:
@ -386,7 +388,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
) )
if age_days is None and fb_count is None: if age_days is None and fb_count is None:
return # nothing new to write return # nothing new to write
thread_store = self._store.clone() thread_store = Store(db_path)
seller = thread_store.get_seller("ebay", seller_id) seller = thread_store.get_seller("ebay", seller_id)
if not seller: if not seller:
log.warning("BTF enrich: seller %s not found in DB", seller_id) log.warning("BTF enrich: seller %s not found in DB", seller_id)

View file

@ -8,9 +8,9 @@ Current task types:
result to trust_scores.photo_analysis_json (Paid tier). result to trust_scores.photo_analysis_json (Paid tier).
Image assessment routing: Image assessment routing:
Cloud (GPU_SERVER_URL set): allocates via cf-orch task endpoint Cloud (CF_ORCH_URL set): allocates via cf-orch task endpoint
product=snipe, task=image_assessment. product=snipe, task=image_assessment.
Local (no GPU_SERVER_URL) or TaskNotFound fallback: uses LLMRouter Local (no CF_ORCH_URL) or TaskNotFound fallback: uses LLMRouter
with a vision-capable local backend (moondream2, llava, etc.). with a vision-capable local backend (moondream2, llava, etc.).
""" """
from __future__ import annotations from __future__ import annotations
@ -135,7 +135,7 @@ def _run_trust_photo_analysis(
if listing_title: if listing_title:
user_prompt = f"Assess this eBay listing image: {listing_title}" user_prompt = f"Assess this eBay listing image: {listing_title}"
cforch_url = os.getenv("GPU_SERVER_URL") or os.getenv("CF_ORCH_URL") cforch_url = os.getenv("CF_ORCH_URL")
if cforch_url: if cforch_url:
raw = _assess_via_orch(cforch_url, image_data_url, user_prompt) raw = _assess_via_orch(cforch_url, image_data_url, user_prompt)
else: else:

View file

@ -2,7 +2,7 @@ import hashlib
import math import math
from app.db.models import Listing, TrustScore from app.db.models import Listing, TrustScore
from app.db.protocol import SharedTableProtocol from app.db.store import Store
from .aggregator import Aggregator from .aggregator import Aggregator
from .metadata import MetadataScorer from .metadata import MetadataScorer
@ -12,7 +12,7 @@ from .photo import PhotoScorer
class TrustScorer: class TrustScorer:
"""Orchestrates metadata + photo scoring for a batch of listings.""" """Orchestrates metadata + photo scoring for a batch of listings."""
def __init__(self, shared_store: SharedTableProtocol): def __init__(self, shared_store: Store):
self._store = shared_store self._store = shared_store
self._meta = MetadataScorer() self._meta = MetadataScorer()
self._photo = PhotoScorer() self._photo = PhotoScorer()

View file

@ -20,12 +20,9 @@ services:
CLOUD_MODE: "true" CLOUD_MODE: "true"
CLOUD_DATA_ROOT: /devl/snipe-cloud-data CLOUD_DATA_ROOT: /devl/snipe-cloud-data
# DIRECTUS_JWT_SECRET, HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN — set in .env (never commit) # DIRECTUS_JWT_SECRET, HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN — set in .env (never commit)
# GPU_SERVER_URL routes LLM query builder through cf-orch for VRAM-aware scheduling. # CF_ORCH_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
# Override in .env to use a different coordinator URL. # Override in .env to use a different coordinator URL.
GPU_SERVER_URL: "http://host.docker.internal:7700" CF_ORCH_URL: "http://host.docker.internal:7700"
# SNIPE_SHARED_DB_URL — Postgres DSN for shared tables (sellers, market_comps, blocklist).
# Required for production multi-user deployments. Set in .env (never commit).
# SNIPE_SHARED_DB_URL: "postgresql://snipe:<password>@postgres:5432/snipe_shared"
CF_APP_NAME: snipe CF_APP_NAME: snipe
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"

View file

@ -18,8 +18,8 @@ services:
environment: environment:
- RELOAD=true - RELOAD=true
# Point the LLM/vision task scheduler at the local cf-orch coordinator. # Point the LLM/vision task scheduler at the local cf-orch coordinator.
# Only has effect when GPU_SERVER_URL is set (uncomment in .env, or set inline). # Only has effect when CF_ORCH_URL is set (uncomment in .env, or set inline).
# - GPU_SERVER_URL=http://10.1.10.71:7700 # - CF_ORCH_URL=http://10.1.10.71:7700
# cf-orch agent — routes trust_photo_analysis vision tasks to the GPU coordinator. # cf-orch agent — routes trust_photo_analysis vision tasks to the GPU coordinator.
# Only starts when you pass --profile orch: # Only starts when you pass --profile orch:

View file

@ -6,7 +6,7 @@
# (claude_code, copilot) are intentionally excluded here. # (claude_code, copilot) are intentionally excluded here.
# #
# CF Orchestrator routes both ollama and vllm allocations for VRAM-aware # CF Orchestrator routes both ollama and vllm allocations for VRAM-aware
# scheduling. GPU_SERVER_URL must be set in .env for allocations to resolve; # scheduling. CF_ORCH_URL must be set in .env for allocations to resolve;
# if cf-orch is unreachable the backend falls back to its static base_url. # if cf-orch is unreachable the backend falls back to its static base_url.
# #
# Model choice for query builder: llama3.1:8b # Model choice for query builder: llama3.1:8b

View file

@ -8,7 +8,7 @@ version = "0.3.0"
description = "Auction listing monitor and trust scorer" description = "Auction listing monitor and trust scorer"
requires-python = ">=3.11" requires-python = ">=3.11"
dependencies = [ dependencies = [
"circuitforge-core[community]>=0.8.0", "circuitforge-core>=0.8.0",
"streamlit>=1.32", "streamlit>=1.32",
"requests>=2.31", "requests>=2.31",
"imagehash>=4.3", "imagehash>=4.3",
@ -24,15 +24,10 @@ dependencies = [
"cryptography>=42.0", "cryptography>=42.0",
"PyJWT>=2.8", "PyJWT>=2.8",
"httpx>=0.27", "httpx>=0.27",
"circuitforge-orch>=0.1.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]
orchestration = [
# Paid+ tier only — not published to PyPI. Install from source or Forgejo Packages.
# pip install -e ../circuitforge-orch (dev)
# pip install snipe[orchestration] (self-hosted Paid+)
"circuitforge-orch>=0.1.0",
]
dev = [ dev = [
"pytest>=8.0", "pytest>=8.0",
"pytest-cov>=5.0", "pytest-cov>=5.0",

View file

@ -1,17 +0,0 @@
import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers",
"postgres: mark test as requiring a live Postgres instance (SNIPE_SHARED_DB_URL must be set)",
)
@pytest.fixture
def postgres_dsn():
dsn = os.environ.get("SNIPE_SHARED_DB_URL")
if not dsn:
pytest.skip("SNIPE_SHARED_DB_URL not set — skipping Postgres tests")
return dsn

View file

@ -1,157 +0,0 @@
"""Tests for SnipeSharedStore — requires live Postgres via SNIPE_SHARED_DB_URL."""
import pytest
from app.db.models import MarketComp, Seller
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore
from app.db.protocol import SharedTableProtocol
@pytest.mark.postgres
def test_snipe_shared_store_satisfies_protocol(postgres_dsn):
assert issubclass(SnipeSharedStore, SharedTableProtocol)
@pytest.mark.postgres
def test_save_and_get_seller(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
seller = Seller(
platform="ebay",
platform_seller_id="test-seller-001",
username="testseller",
account_age_days=365,
feedback_count=100,
feedback_ratio=0.99,
category_history_json='{"electronics": 5}',
)
store.save_seller(seller)
result = store.get_seller("ebay", "test-seller-001")
assert result is not None
assert result.username == "testseller"
assert result.feedback_count == 100
store.delete_seller_data("ebay", "test-seller-001")
db.close()
@pytest.mark.postgres
def test_save_sellers_coalesce_preserves_age(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
seller_with_age = Seller(
platform="ebay", platform_seller_id="coalesce-test",
username="u", account_age_days=730,
feedback_count=50, feedback_ratio=0.95, category_history_json="{}",
)
store.save_seller(seller_with_age)
seller_without_age = Seller(
platform="ebay", platform_seller_id="coalesce-test",
username="u", account_age_days=None,
feedback_count=60, feedback_ratio=0.96, category_history_json="{}",
)
store.save_sellers([seller_without_age])
result = store.get_seller("ebay", "coalesce-test")
assert result.account_age_days == 730
assert result.feedback_count == 60
store.delete_seller_data("ebay", "coalesce-test")
db.close()
@pytest.mark.postgres
def test_market_comp_cache(postgres_dsn):
from datetime import datetime, timedelta, timezone
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
expires = (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat()
comp = MarketComp(
platform="ebay", query_hash="abc123",
median_price=49.99, sample_count=10, expires_at=expires,
)
store.save_market_comp(comp)
result = store.get_market_comp("ebay", "abc123")
assert result is not None
assert result.median_price == 49.99
db.close()
@pytest.mark.postgres
def test_reported_sellers(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.mark_reported("ebay", "bad-seller-99", username="badguy")
reported = store.list_reported("ebay")
assert "bad-seller-99" in reported
store.mark_reported("ebay", "bad-seller-99") # idempotent
db.close()
@pytest.mark.postgres
def test_clone_returns_self(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
store = SnipeSharedStore(db)
assert store.clone() is store
db.close()
@pytest.mark.postgres
def test_blocklist_add_get_remove(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
assert not store.is_blocklisted("ebay", "bad-999")
entry = store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="bad-999",
username="scammer1", reason="sold fakes", source="manual",
))
assert entry.id is not None
assert store.is_blocklisted("ebay", "bad-999")
entries = store.list_blocklist("ebay")
assert any(e.platform_seller_id == "bad-999" for e in entries)
store.remove_from_blocklist("ebay", "bad-999")
assert not store.is_blocklisted("ebay", "bad-999")
db.close()
@pytest.mark.postgres
def test_blocklist_upsert_is_idempotent(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller", reason="reason1", source="manual",
))
# Second add — should not raise, should update username but preserve reason via COALESCE
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller_updated", reason=None, source="community",
))
entries = [e for e in store.list_blocklist("ebay") if e.platform_seller_id == "dup-test"]
assert len(entries) == 1
assert entries[0].username == "seller_updated"
assert entries[0].reason == "reason1" # COALESCE preserved original reason
store.remove_from_blocklist("ebay", "dup-test")
db.close()

View file

@ -1,39 +0,0 @@
"""Verify Store satisfies SharedTableProtocol at import time."""
from app.db.protocol import SharedTableProtocol
from app.db.store import Store
def test_store_satisfies_protocol():
assert issubclass(Store, SharedTableProtocol)
def test_store_clone_returns_new_instance(tmp_path):
db = tmp_path / "test.db"
s = Store(db)
clone = s.clone()
assert isinstance(clone, Store)
assert clone is not s
assert clone._db_path == db
def test_ebay_adapter_accepts_protocol():
from app.platforms.ebay.adapter import EbayAdapter
import tempfile
import pathlib
from unittest.mock import MagicMock
with tempfile.TemporaryDirectory() as tmp:
s = Store(pathlib.Path(tmp) / "t.db")
adapter = EbayAdapter(token_manager=MagicMock(), shared_store=s)
assert adapter._store is s
def test_scraped_adapter_no_db_path_ref():
from app.platforms.ebay.scraper import ScrapedEbayAdapter
import tempfile
import pathlib
with tempfile.TemporaryDirectory() as tmp:
s = Store(pathlib.Path(tmp) / "t.db")
adapter = ScrapedEbayAdapter(shared_store=s)
assert not hasattr(adapter, '_db_path_ref')

View file

@ -173,7 +173,7 @@ def _make_orch_client_mock(vision_json: str) -> MagicMock:
def test_run_task_photo_analysis_orch_success(tmp_db: Path): def test_run_task_photo_analysis_orch_success(tmp_db: Path):
"""Cloud path: CFOrchClient.task_allocate is used when GPU_SERVER_URL is set.""" """Cloud path: CFOrchClient.task_allocate is used when CF_ORCH_URL is set."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS) task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
chat_resp = MagicMock() chat_resp = MagicMock()
@ -181,7 +181,7 @@ def test_run_task_photo_analysis_orch_success(tmp_db: Path):
chat_resp.raise_for_status = MagicMock() chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \ with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \ patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \ patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient: patch("circuitforge_orch.client.CFOrchClient") as MockClient:
@ -216,7 +216,7 @@ def test_run_task_photo_analysis_orch_uses_image_assessment_task(tmp_db: Path):
chat_resp.raise_for_status = MagicMock() chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \ with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \ patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \ patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient: patch("circuitforge_orch.client.CFOrchClient") as MockClient:
@ -248,7 +248,7 @@ def test_run_task_photo_analysis_orch_sends_image_url_content(tmp_db: Path):
return resp return resp
with patch("app.tasks.runner.requests") as mock_req, \ with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \ patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \ patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient: patch("circuitforge_orch.client.CFOrchClient") as MockClient:
@ -282,7 +282,7 @@ def test_run_task_photo_analysis_orch_task_not_found_falls_back(tmp_db: Path):
client_instance.task_allocate.return_value = cm client_instance.task_allocate.return_value = cm
with patch("app.tasks.runner.requests") as mock_req, \ with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \ patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \ patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \
patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local: patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local: