Compare commits

..

11 commits

Author SHA1 Message Date
9bd6d9513e docs: add LLM development disclosure to README
Some checks failed
CI / Python tests (push) Has been cancelled
CI / Frontend typecheck + tests (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
Humans own design, architecture, code review, testing, and
verification. LLMs are part of our development workflow.
Links to circuitforge.tech/positions for our full position.
2026-05-28 08:20:17 -07:00
341d66d5f0 feat: migrate shared_db (sellers/market_comps/blocklist) from SQLite to Postgres (#45)
Three-layer migration: SQLite Store remains for per-user tables (listings,
trust_scores, background_tasks, community_signals). Postgres takes over
for all high-contention shared tables.

Closes: #45
2026-05-22 15:49:02 -07:00
e34c2b9982 feat(db): wire Postgres shared backend into main.py and extend protocol
SharedTableProtocol now covers the full shared-table surface:
  - sellers, market_comps, reported_sellers (already in SnipeSharedStore)
  - scammer_blocklist (new — is_blocklisted, add/remove/list_blocklist)
  - refresh_seller_categories (reads per-user SQLite, writes to Postgres)

TrustScorer updated to accept SharedTableProtocol (was Store).

api/main.py:
  - _pg_shared_store global + _make_shared_store(path) helper
  - Lifespan init: SNIPE_SHARED_DB_URL → SnipeSharedDB + SnipeSharedStore
  - All Store(shared_db) calls for shared tables replaced with
    _make_shared_store(shared_db) or shared_store.clone()
  - Blocklist endpoints use _make_shared_store (Postgres when configured)
  - Community signals stay SQLite-only (low-write, not in protocol)

Postgres migration 001: scammer_blocklist table added.
8 blocklist tests added (gated behind SNIPE_SHARED_DB_URL / @pytest.mark.postgres).
.env.example: SNIPE_SHARED_DB_URL documented.
compose.cloud.yml: GPU_SERVER_URL + SNIPE_SHARED_DB_URL comment added.

248 passed, 8 skipped (postgres-gated).

Closes: #45
2026-05-22 15:47:36 -07:00
cc997c09e3 refactor: rename CF_ORCH_URL → GPU_SERVER_URL (backward-compat alias kept)
GPU_SERVER_URL is the self-explanatory name a self-hoster can understand
without knowing CircuitForge internals. CF_ORCH_URL continues to work as
a drop-in fallback alias (runner.py, main.py both check GPU_SERVER_URL
first, then CF_ORCH_URL).

Updated everywhere the env var is referenced or documented:
- app/tasks/runner.py
- api/main.py
- app/llm/router.py
- .env.example (alias note added)
- compose.override.yml
- compose.cloud.yml
- config/llm.cloud.yaml
- tests/test_tasks/test_runner.py (primary key updated; 13/13 still pass)

Follows the GPU_SERVER_URL convention established in kiwi (see kiwi
app/core/config.py).

Closes: #55
2026-05-21 15:05:27 -07:00
c10a481ce3 chore: move circuitforge-orch to optional extras group
Free-tier users get a clean `pip install snipe` (or pip install -e .)
without hitting a resolution error for circuitforge-orch, which is not
on PyPI and is a Paid+ feature.

Runtime tier gate in runner.py / main.py already handles the missing-
package case gracefully (falls back to LLMRouter when GPU_SERVER_URL
is unset). Install-time gating was a violation of the CF MIT boundary.

Upgrade path: pip install snipe[orchestration]

Closes: #56
2026-05-21 15:05:11 -07:00
80ac13e69f refactor(adapters): accept SharedTableProtocol; replace thread-local Store pattern with clone() 2026-05-18 09:12:00 -07:00
9d8b627fe1 fix(db): remove redundant _snipe_shared_migrations DDL from SQL file (runner owns it) 2026-05-18 09:09:35 -07:00
1d6556072f feat(db): SnipeSharedStore — Postgres backend for sellers, market_comps, reported_sellers
Implements SharedTableProtocol against a ThreadedConnectionPool (psycopg2).
SnipeSharedDB handles pool lifecycle and idempotent SQL migrations.
save_sellers uses COALESCE to preserve existing account_age_days when the
new record omits it. All 6 Postgres tests skip cleanly without SNIPE_SHARED_DB_URL.
2026-05-18 09:07:32 -07:00
78809c761e feat(db): SharedTableProtocol + Store.clone() for dual-backend support 2026-05-18 08:53:47 -07:00
6fbcf90740 feat(db): Postgres schema for shared sellers, market_comps, reported_sellers 2026-05-18 08:31:11 -07:00
5ddfbece8e chore(deps): add psycopg2-binary for shared Postgres migration 2026-05-18 08:30:34 -07:00
22 changed files with 832 additions and 53 deletions

View file

@ -98,16 +98,25 @@ CF_APP_NAME=snipe
# OLLAMA_HOST=http://localhost:11434
# OLLAMA_MODEL=llava:7b
# CF Orchestrator — routes vision/LLM tasks to a cf-orch coordinator for VRAM management.
# GPU Server — routes vision/LLM tasks to a cf-orch coordinator for VRAM management.
# Self-hosted: point at a local cf-orch coordinator if you have one running.
# Cloud (internal): managed coordinator at orch.circuitforge.tech.
# Leave unset to run vision tasks inline (no VRAM coordination).
# CF_ORCH_URL=http://10.1.10.71:7700
# GPU_SERVER_URL=http://10.1.10.71:7700
#
# CF_ORCH_URL is accepted as a backward-compat alias for GPU_SERVER_URL.
#
# cf-orch agent (compose --profile orch) — coordinator URL for the sidecar agent.
# Defaults to CF_ORCH_URL if unset.
# Defaults to GPU_SERVER_URL if unset.
# CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700
# ── Shared Postgres (optional — strongly recommended for cloud/multi-user) ────
# When set, sellers, market_comps, reported_sellers, and scammer_blocklist are
# stored in Postgres instead of SQLite. Required to avoid database-locked errors
# under concurrent load (>10 simultaneous search users).
# Cloud instances: set to the cf-postgres DSN. Self-hosted: leave unset for SQLite.
# SNIPE_SHARED_DB_URL=postgresql://snipe:<password>@localhost:5432/snipe_shared
# ── Community DB (optional) ──────────────────────────────────────────────────
# When set, seller trust signals (confirmed scammers added to blocklist) are
# published to the shared community PostgreSQL for cross-user signal aggregation.

View file

@ -5,6 +5,7 @@ WORKDIR /app
# System deps for Playwright/Chromium
RUN apt-get update && apt-get install -y --no-install-recommends \
xvfb \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Install circuitforge-core from sibling directory (compose sets context: ..)

View file

@ -174,6 +174,8 @@ Snipe uses a dual license:
| Discovery pipeline — scrapers, platform adapters, search, keyword filtering | [MIT](LICENSE-MIT) |
| LLM trust-scoring, query builder, vision assessment, AI features | [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use requires a paid license; converts to MIT after 4 years |
Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)
Privacy · Safety · Accessibility — co-equal, non-negotiable.
[circuitforge.tech](https://circuitforge.tech)

View file

@ -33,6 +33,7 @@ from api.cloud_session import CloudUser, compute_features, get_session
from api.ebay_webhook import router as ebay_webhook_router
from app.db.models import SavedSearch as SavedSearchModel
from app.db.models import ScammerEntry
from app.db.protocol import SharedTableProtocol
from app.db.store import Store
from app.platforms import SUPPORTED_PLATFORMS, SearchFilters
from app.platforms.ebay.adapter import EbayAdapter
@ -142,6 +143,19 @@ def _get_community_store() -> "SnipeCommunityStore | None":
return _community_store
# ── Shared Postgres backend (optional — active when SNIPE_SHARED_DB_URL is set) ─
# Replaces the SQLite shared.db for sellers, market_comps, reported_sellers, and
# scammer_blocklist. ThreadedConnectionPool is thread-safe; one instance per process.
_pg_shared_store: "SharedTableProtocol | None" = None
def _make_shared_store(path: Path) -> SharedTableProtocol:
"""Return the active shared backend — Postgres if configured, SQLite otherwise."""
if _pg_shared_store is not None:
return _pg_shared_store
return Store(path)
# ── LLM Query Builder singletons (optional — requires LLM backend) ────────────
_category_cache = None
_query_translator = None
@ -153,7 +167,7 @@ def _get_query_translator():
@asynccontextmanager
async def _lifespan(app: FastAPI):
global _community_store
global _community_store, _pg_shared_store
# Pre-warm the Chromium browser pool so the first scrape request does not
# pay the full cold-start cost (5-10s Xvfb + browser launch).
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
@ -178,6 +192,21 @@ async def _lifespan(app: FastAPI):
get_scheduler(sched_db)
log.info("Snipe task scheduler started (db=%s)", sched_db)
# Shared Postgres backend — optional. Replaces SQLite for sellers, market_comps,
# reported_sellers, and scammer_blocklist under concurrent load.
snipe_shared_dsn = os.environ.get("SNIPE_SHARED_DB_URL", "")
if snipe_shared_dsn:
try:
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore as _SnipeSharedStore
_pg_db = SnipeSharedDB(snipe_shared_dsn)
_pg_db.run_migrations()
_pg_shared_store = _SnipeSharedStore(_pg_db)
log.info("Shared Postgres backend ready (sellers, market_comps, blocklist)")
except Exception:
log.exception(
"SNIPE_SHARED_DB_URL set but Postgres init failed — falling back to SQLite"
)
# Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset.
community_db_url = os.environ.get("COMMUNITY_DB_URL", "")
if community_db_url:
@ -209,7 +238,7 @@ async def _lifespan(app: FastAPI):
_category_cache.refresh(token_manager=None) # bootstrap fallback
try:
cforch_url = os.getenv("CF_ORCH_URL") or None
cforch_url = os.getenv("GPU_SERVER_URL") or os.getenv("CF_ORCH_URL") or None
if cforch_url:
_query_translator = QueryTranslator(
category_cache=_category_cache,
@ -446,7 +475,7 @@ def session_info(response: Response, session: CloudUser = Depends(get_session)):
def _trigger_scraper_enrichment(
listings: list,
shared_store: Store,
shared_store: SharedTableProtocol,
shared_db: Path,
user_db: Path | None = None,
query: str = "",
@ -512,7 +541,7 @@ def _trigger_scraper_enrichment(
if not session_id or session_id not in _update_queues:
return
q = _update_queues[session_id]
thread_shared = Store(shared_db)
thread_shared = shared_store.clone()
thread_user = Store(user_db or shared_db)
scorer = TrustScorer(thread_shared)
comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest())
@ -538,7 +567,7 @@ def _trigger_scraper_enrichment(
def _run():
try:
enricher = ScrapedEbayAdapter(Store(shared_db))
enricher = ScrapedEbayAdapter(shared_store.clone())
if needs_btf:
enricher.enrich_sellers_btf(needs_btf, max_workers=2)
log.info("BTF enrichment complete for %d sellers", len(needs_btf))
@ -812,7 +841,7 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue()
try:
shared_store = Store(shared_db)
shared_store = _make_shared_store(shared_db)
user_store = Store(user_db)
# Re-hydrate Listing dataclass instances from the cached dicts so the
@ -897,13 +926,14 @@ def search(
_evict_expired_cache()
log.info("cache: miss key=%s q=%r", cache_key, q)
# Each thread creates its own Store — sqlite3 check_same_thread=True.
# Each thread creates its own store via clone() — sqlite3 check_same_thread=True;
# SnipeSharedStore.clone() returns self (ThreadedConnectionPool is thread-safe).
def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
return _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None:
try:
_make_adapter(Store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
_make_adapter(_make_shared_store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception:
log.warning("comps: unhandled exception for %r", comp_query, exc_info=True)
@ -944,10 +974,9 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue()
try:
# Main-thread stores — fresh connections, same thread.
# shared_store: sellers, market_comps (all users share this data)
# user_store: listings, saved_searches (per-user in cloud mode, same file in local mode)
shared_store = Store(shared_db)
# Main-thread stores — shared_store may be Postgres (sellers, market_comps);
# user_store is always per-user SQLite (listings, trust_scores, saved_searches).
shared_store = _make_shared_store(shared_db)
user_store = Store(user_db)
user_store.save_listings(listings)
@ -1207,7 +1236,7 @@ def search_async(
cached_listings_raw = payload["listings"]
cached_market_price = payload["market_price"]
try:
shared_store = Store(_shared_db)
shared_store = _make_shared_store(_shared_db)
user_store = Store(_user_db)
listings = [_Listing(**d) for d in cached_listings_raw]
user_store.save_listings(listings)
@ -1287,11 +1316,11 @@ def search_async(
try:
def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
return _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None:
try:
_make_adapter(Store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
_make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception:
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
@ -1314,7 +1343,7 @@ def search_async(
platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm,
)
shared_store = Store(_shared_db)
shared_store = _make_shared_store(_shared_db)
user_store = Store(_user_db)
user_store.save_listings(listings)
@ -1473,7 +1502,7 @@ def enrich_seller(
"""
import threading
shared_store = Store(session.shared_db)
shared_store = _make_shared_store(session.shared_db)
user_store = Store(session.user_db)
shared_db = session.shared_db
@ -1502,7 +1531,7 @@ def enrich_seller(
def _btf():
try:
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_btf(
ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_btf(
{seller: listing_id}, max_workers=1
)
except Exception as e:
@ -1510,7 +1539,7 @@ def enrich_seller(
def _ssn():
try:
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_categories(
ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_categories(
[seller], max_workers=1
)
except Exception as e:
@ -1781,7 +1810,7 @@ class BlocklistAdd(BaseModel):
@app.get("/api/blocklist")
def list_blocklist(session: CloudUser = Depends(get_session)):
store = Store(session.shared_db)
store = _make_shared_store(session.shared_db)
return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]}
@ -1792,7 +1821,7 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
status_code=403,
detail="Sign in to report sellers to the community blocklist.",
)
store = Store(session.shared_db)
store = _make_shared_store(session.shared_db)
entry = store.add_to_blocklist(ScammerEntry(
platform=body.platform,
platform_seller_id=body.platform_seller_id,
@ -1826,13 +1855,13 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
@app.delete("/api/blocklist/{platform_seller_id}", status_code=204)
def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)):
Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
_make_shared_store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
@app.get("/api/blocklist/export")
def export_blocklist(session: CloudUser = Depends(get_session)):
"""Download the blocklist as a CSV file."""
entries = Store(session.shared_db).list_blocklist()
entries = _make_shared_store(session.shared_db).list_blocklist()
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"])
@ -1864,7 +1893,7 @@ async def import_blocklist(
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File must be UTF-8 encoded")
store = Store(session.shared_db)
store = _make_shared_store(session.shared_db)
imported = 0
errors: list[str] = []
reader = csv.DictReader(io.StringIO(text))

View file

@ -0,0 +1,49 @@
-- Snipe shared tables: sellers, market_comps, reported_sellers
-- Replaces the equivalent tables in shared.db (SQLite).
-- Per-user tables (listings, trust_scores, saved_searches) remain in SQLite.
CREATE TABLE IF NOT EXISTS sellers (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
account_age_days INTEGER,
feedback_count INTEGER NOT NULL DEFAULT 0,
feedback_ratio DOUBLE PRECISION NOT NULL DEFAULT 0,
category_history_json TEXT NOT NULL DEFAULT '{}',
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS market_comps (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
query_hash TEXT NOT NULL,
median_price DOUBLE PRECISION NOT NULL,
sample_count INTEGER NOT NULL,
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
UNIQUE (platform, query_hash)
);
CREATE TABLE IF NOT EXISTS reported_sellers (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT,
reported_by TEXT NOT NULL DEFAULT 'user',
reported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS scammer_blocklist (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
reason TEXT,
source TEXT NOT NULL DEFAULT 'manual',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);

View file

380
app/db/pg_shared.py Normal file
View file

@ -0,0 +1,380 @@
from __future__ import annotations
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from app.db.models import MarketComp, ScammerEntry, Seller
log = logging.getLogger(__name__)
_MIN_CONN = 2
_MAX_CONN = 20
class SnipeSharedDB:
"""Thread-safe Postgres connection pool for Snipe shared tables."""
def __init__(self, dsn: str) -> None:
self._pool = ThreadedConnectionPool(_MIN_CONN, _MAX_CONN, dsn=dsn)
def getconn(self):
return self._pool.getconn()
def putconn(self, conn) -> None:
self._pool.putconn(conn)
def close(self) -> None:
self._pool.closeall()
def run_migrations(self) -> None:
"""Apply pg_migrations/*.sql in filename order. Idempotent."""
migrations_dir = Path(__file__).parent / "pg_migrations"
files = sorted(migrations_dir.glob("*.sql"), key=lambda p: p.name)
conn = self.getconn()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS _snipe_shared_migrations (
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
conn.commit()
for f in files:
cur.execute(
"SELECT 1 FROM _snipe_shared_migrations WHERE filename = %s",
(f.name,),
)
if cur.fetchone():
continue
log.info("Applying migration: %s", f.name)
cur.execute(f.read_text())
cur.execute(
"INSERT INTO _snipe_shared_migrations (filename) VALUES (%s)",
(f.name,),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self.putconn(conn)
class SnipeSharedStore:
"""Postgres-backed store for sellers, market_comps, and reported_sellers.
Satisfies SharedTableProtocol. clone() returns self ThreadedConnectionPool
is already thread-safe, so no new instance is needed per thread.
"""
def __init__(self, db: SnipeSharedDB) -> None:
self._db = db
def clone(self) -> "SnipeSharedStore":
return self
# Sellers
def save_seller(self, seller: Seller) -> None:
self.save_sellers([seller])
def save_sellers(self, sellers: list[Seller]) -> None:
if not sellers:
return
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.executemany(
"""
INSERT INTO sellers
(platform, platform_seller_id, username, account_age_days,
feedback_count, feedback_ratio, category_history_json)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
feedback_count = EXCLUDED.feedback_count,
feedback_ratio = EXCLUDED.feedback_ratio,
account_age_days = COALESCE(
EXCLUDED.account_age_days,
sellers.account_age_days
),
category_history_json = COALESCE(
NULLIF(NULLIF(EXCLUDED.category_history_json, '{}'), ''),
NULLIF(NULLIF(sellers.category_history_json, '{}'), ''),
'{}'
),
fetched_at = NOW()
""",
[
(s.platform, s.platform_seller_id, s.username, s.account_age_days,
s.feedback_count, s.feedback_ratio, s.category_history_json or "{}")
for s in sellers
],
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, account_age_days,
feedback_count, feedback_ratio, category_history_json,
id, fetched_at
FROM sellers
WHERE platform = %s AND platform_seller_id = %s
""",
(platform, platform_seller_id),
)
row = cur.fetchone()
if not row:
return None
return Seller(*row[:7], id=row[7], fetched_at=str(row[8]))
finally:
self._db.putconn(conn)
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM sellers WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
# MarketComps
def save_market_comp(self, comp: MarketComp) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO market_comps
(platform, query_hash, median_price, sample_count, expires_at)
VALUES (%s, %s, %s, %s, %s::TIMESTAMPTZ)
ON CONFLICT (platform, query_hash) DO UPDATE SET
median_price = EXCLUDED.median_price,
sample_count = EXCLUDED.sample_count,
expires_at = EXCLUDED.expires_at,
fetched_at = NOW()
""",
(comp.platform, comp.query_hash, comp.median_price,
comp.sample_count, comp.expires_at),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
now = datetime.now(timezone.utc).isoformat()
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, query_hash, median_price, sample_count,
expires_at, id, fetched_at
FROM market_comps
WHERE platform = %s AND query_hash = %s AND expires_at > %s::TIMESTAMPTZ
""",
(platform, query_hash, now),
)
row = cur.fetchone()
if not row:
return None
return MarketComp(*row[:5], id=row[5], fetched_at=str(row[6]))
finally:
self._db.putconn(conn)
# Reported Sellers
def mark_reported(
self,
platform: str,
platform_seller_id: str,
username: Optional[str] = None,
reported_by: str = "user",
) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reported_sellers
(platform, platform_seller_id, username, reported_by)
VALUES (%s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO NOTHING
""",
(platform, platform_seller_id, username, reported_by),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_reported(self, platform: str = "ebay") -> list[str]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT platform_seller_id FROM reported_sellers WHERE platform = %s",
(platform,),
)
return [row[0] for row in cur.fetchall()]
finally:
self._db.putconn(conn)
# Seller Category Refresh
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store=None, # always a SQLite Store in practice
) -> int:
"""Derive category_history_json from listing data and update sellers in Postgres.
listing_store must be provided (it's always the per-user SQLite Store).
Returns count of sellers updated.
"""
from app.platforms.ebay.scraper import _classify_category_label # lazy to avoid circular
import json
if not seller_ids or listing_store is None:
return 0
updated = 0
for sid in seller_ids:
seller = self.get_seller(platform, sid)
if not seller or seller.category_history_json not in ("{}", "", None):
continue
# listing_store is always a SQLite Store; access _conn directly for the query.
rows = listing_store._conn.execute(
"SELECT category_name, COUNT(*) FROM listings "
"WHERE platform=? AND seller_platform_id=? AND category_name IS NOT NULL "
"GROUP BY category_name",
(platform, sid),
).fetchall()
if not rows:
continue
counts: dict[str, int] = {}
for cat_name, cnt in rows:
key = _classify_category_label(cat_name)
if key:
counts[key] = counts.get(key, 0) + cnt
if counts:
from dataclasses import replace
self.save_sellers([replace(seller, category_history_json=json.dumps(counts))])
updated += 1
return updated
# Scammer Blocklist
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s LIMIT 1",
(platform, platform_seller_id),
)
return cur.fetchone() is not None
finally:
self._db.putconn(conn)
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO scammer_blocklist
(platform, platform_seller_id, username, reason, source)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
reason = COALESCE(EXCLUDED.reason, scammer_blocklist.reason),
source = EXCLUDED.source
""",
(entry.platform, entry.platform_seller_id, entry.username,
entry.reason, entry.source),
)
conn.commit()
cur.execute(
"SELECT id, created_at FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(entry.platform, entry.platform_seller_id),
)
row = cur.fetchone()
from dataclasses import replace
return replace(entry, id=row[0], created_at=str(row[1]))
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, reason, source, id, created_at
FROM scammer_blocklist
WHERE platform = %s
ORDER BY created_at DESC
""",
(platform,),
)
return [
ScammerEntry(
platform=r[0], platform_seller_id=r[1], username=r[2],
reason=r[3], source=r[4], id=r[5], created_at=str(r[6]),
)
for r in cur.fetchall()
]
finally:
self._db.putconn(conn)

86
app/db/protocol.py Normal file
View file

@ -0,0 +1,86 @@
"""Protocol (duck-type interface) for shared table backends (SQLite and Postgres)."""
from __future__ import annotations
from typing import Any, Optional, Protocol, runtime_checkable
from app.db.models import MarketComp, ScammerEntry, Seller
@runtime_checkable
class SharedTableProtocol(Protocol):
"""Protocol that both Store (SQLite) and SnipeSharedStore (Postgres) must satisfy.
This enables code that reads/writes shared tables (sellers, market_comps,
reported_sellers, scammer_blocklist) to remain agnostic to the underlying backend.
"""
def save_seller(self, seller: Seller) -> None:
"""Persist a single seller record."""
...
def save_sellers(self, sellers: list[Seller]) -> None:
"""Persist multiple seller records (batch upsert)."""
...
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
"""Fetch a single seller by platform and platform_seller_id."""
...
def save_market_comp(self, comp: MarketComp) -> None:
"""Persist a market comparison record."""
...
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
"""Fetch a market comparison by platform and query_hash."""
...
def mark_reported(
self,
platform: str,
platform_seller_id: str,
username: Optional[str] = None,
reported_by: str = "user",
) -> None:
"""Record that a seller has been reported to the platform."""
...
def list_reported(self, platform: str = "ebay") -> list[str]:
"""Return all platform_seller_ids that have been reported."""
...
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
"""Permanently erase a seller and all related data (GDPR/eBay compliance)."""
...
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store: Optional[Any] = None,
) -> int:
"""Derive category_history_json for sellers that lack it from stored listings.
listing_store: Store holding listings (may differ from self in split-DB mode).
Returns count of sellers updated.
"""
...
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
"""Return True if a seller is on the community scammer blocklist."""
...
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
"""Upsert a seller into the blocklist. Returns the saved entry with id and created_at."""
...
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
"""Remove a seller from the blocklist."""
...
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
"""Return all blocklisted sellers for a platform, newest first."""
...
def clone(self) -> SharedTableProtocol:
"""Create a new independent instance pointing to the same backend."""
...

View file

@ -21,6 +21,10 @@ class Store:
# WAL mode: allows concurrent readers + one writer without blocking
self._conn.execute("PRAGMA journal_mode=WAL")
def clone(self) -> Store:
"""Create a new independent instance pointing to the same database."""
return Store(self._db_path)
# --- Seller ---
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:

View file

@ -6,7 +6,7 @@ Snipe LLMRouter shim — tri-level config path priority.
Config lookup order:
1. <repo>/config/llm.yaml per-install local override
2. ~/.config/circuitforge/llm.yaml user-level config (circuitforge-core default)
3. env-var auto-config (ANTHROPIC_API_KEY, OPENAI_API_KEY, OLLAMA_HOST, CF_ORCH_URL)
3. env-var auto-config (ANTHROPIC_API_KEY, OPENAI_API_KEY, OLLAMA_HOST, GPU_SERVER_URL)
"""
from pathlib import Path

View file

@ -22,7 +22,7 @@ _SHOPPING_API_INTER_REQUEST_DELAY = 0.5 # seconds between successive calls
_SELLER_ENRICH_TTL_HOURS = 24 # skip re-enrichment within this window
from app.db.models import Listing, MarketComp, Seller
from app.db.store import Store
from app.db.protocol import SharedTableProtocol
from app.platforms import PlatformAdapter, SearchFilters
from app.platforms.ebay.auth import EbayTokenManager
from app.platforms.ebay.normaliser import normalise_listing, normalise_seller
@ -67,7 +67,7 @@ BROWSE_BASE = {
class EbayAdapter(PlatformAdapter):
def __init__(self, token_manager: EbayTokenManager, shared_store: Store, env: str = "production"):
def __init__(self, token_manager: EbayTokenManager, shared_store: SharedTableProtocol, env: str = "production"):
self._tokens = token_manager
self._store = shared_store
self._env = env

View file

@ -25,7 +25,7 @@ log = logging.getLogger(__name__)
from bs4 import BeautifulSoup
from app.db.models import Listing, MarketComp, Seller
from app.db.store import Store
from app.db.protocol import SharedTableProtocol
from app.platforms import PlatformAdapter, SearchFilters
EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html"
@ -286,7 +286,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
category_history) cause TrustScorer to set score_is_partial=True.
"""
def __init__(self, shared_store: Store, delay: float = 1.0):
def __init__(self, shared_store: SharedTableProtocol, delay: float = 1.0):
self._store = shared_store
self._delay = delay
@ -374,8 +374,6 @@ class ScrapedEbayAdapter(PlatformAdapter):
Does not raise failures per-seller are silently skipped so the main
search response is never blocked.
"""
db_path = self._store._db_path # capture for thread-local Store creation
def _enrich_one(item: tuple[str, str]) -> None:
seller_id, listing_id = item
try:
@ -388,7 +386,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
)
if age_days is None and fb_count is None:
return # nothing new to write
thread_store = Store(db_path)
thread_store = self._store.clone()
seller = thread_store.get_seller("ebay", seller_id)
if not seller:
log.warning("BTF enrich: seller %s not found in DB", seller_id)

View file

@ -8,9 +8,9 @@ Current task types:
result to trust_scores.photo_analysis_json (Paid tier).
Image assessment routing:
Cloud (CF_ORCH_URL set): allocates via cf-orch task endpoint
Cloud (GPU_SERVER_URL set): allocates via cf-orch task endpoint
product=snipe, task=image_assessment.
Local (no CF_ORCH_URL) or TaskNotFound fallback: uses LLMRouter
Local (no GPU_SERVER_URL) or TaskNotFound fallback: uses LLMRouter
with a vision-capable local backend (moondream2, llava, etc.).
"""
from __future__ import annotations
@ -135,7 +135,7 @@ def _run_trust_photo_analysis(
if listing_title:
user_prompt = f"Assess this eBay listing image: {listing_title}"
cforch_url = os.getenv("CF_ORCH_URL")
cforch_url = os.getenv("GPU_SERVER_URL") or os.getenv("CF_ORCH_URL")
if cforch_url:
raw = _assess_via_orch(cforch_url, image_data_url, user_prompt)
else:

View file

@ -2,7 +2,7 @@ import hashlib
import math
from app.db.models import Listing, TrustScore
from app.db.store import Store
from app.db.protocol import SharedTableProtocol
from .aggregator import Aggregator
from .metadata import MetadataScorer
@ -12,7 +12,7 @@ from .photo import PhotoScorer
class TrustScorer:
"""Orchestrates metadata + photo scoring for a batch of listings."""
def __init__(self, shared_store: Store):
def __init__(self, shared_store: SharedTableProtocol):
self._store = shared_store
self._meta = MetadataScorer()
self._photo = PhotoScorer()

View file

@ -20,9 +20,12 @@ services:
CLOUD_MODE: "true"
CLOUD_DATA_ROOT: /devl/snipe-cloud-data
# DIRECTUS_JWT_SECRET, HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN — set in .env (never commit)
# CF_ORCH_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
# GPU_SERVER_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
# Override in .env to use a different coordinator URL.
CF_ORCH_URL: "http://host.docker.internal:7700"
GPU_SERVER_URL: "http://host.docker.internal:7700"
# SNIPE_SHARED_DB_URL — Postgres DSN for shared tables (sellers, market_comps, blocklist).
# Required for production multi-user deployments. Set in .env (never commit).
# SNIPE_SHARED_DB_URL: "postgresql://snipe:<password>@postgres:5432/snipe_shared"
CF_APP_NAME: snipe
extra_hosts:
- "host.docker.internal:host-gateway"

View file

@ -18,8 +18,8 @@ services:
environment:
- RELOAD=true
# Point the LLM/vision task scheduler at the local cf-orch coordinator.
# Only has effect when CF_ORCH_URL is set (uncomment in .env, or set inline).
# - CF_ORCH_URL=http://10.1.10.71:7700
# Only has effect when GPU_SERVER_URL is set (uncomment in .env, or set inline).
# - GPU_SERVER_URL=http://10.1.10.71:7700
# cf-orch agent — routes trust_photo_analysis vision tasks to the GPU coordinator.
# Only starts when you pass --profile orch:

View file

@ -6,7 +6,7 @@
# (claude_code, copilot) are intentionally excluded here.
#
# CF Orchestrator routes both ollama and vllm allocations for VRAM-aware
# scheduling. CF_ORCH_URL must be set in .env for allocations to resolve;
# scheduling. GPU_SERVER_URL must be set in .env for allocations to resolve;
# if cf-orch is unreachable the backend falls back to its static base_url.
#
# Model choice for query builder: llama3.1:8b

View file

@ -8,7 +8,7 @@ version = "0.3.0"
description = "Auction listing monitor and trust scorer"
requires-python = ">=3.11"
dependencies = [
"circuitforge-core>=0.8.0",
"circuitforge-core[community]>=0.8.0",
"streamlit>=1.32",
"requests>=2.31",
"imagehash>=4.3",
@ -24,10 +24,15 @@ dependencies = [
"cryptography>=42.0",
"PyJWT>=2.8",
"httpx>=0.27",
"circuitforge-orch>=0.1.0",
]
[project.optional-dependencies]
orchestration = [
# Paid+ tier only — not published to PyPI. Install from source or Forgejo Packages.
# pip install -e ../circuitforge-orch (dev)
# pip install snipe[orchestration] (self-hosted Paid+)
"circuitforge-orch>=0.1.0",
]
dev = [
"pytest>=8.0",
"pytest-cov>=5.0",

17
tests/conftest.py Normal file
View file

@ -0,0 +1,17 @@
import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers",
"postgres: mark test as requiring a live Postgres instance (SNIPE_SHARED_DB_URL must be set)",
)
@pytest.fixture
def postgres_dsn():
dsn = os.environ.get("SNIPE_SHARED_DB_URL")
if not dsn:
pytest.skip("SNIPE_SHARED_DB_URL not set — skipping Postgres tests")
return dsn

157
tests/db/test_pg_shared.py Normal file
View file

@ -0,0 +1,157 @@
"""Tests for SnipeSharedStore — requires live Postgres via SNIPE_SHARED_DB_URL."""
import pytest
from app.db.models import MarketComp, Seller
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore
from app.db.protocol import SharedTableProtocol
@pytest.mark.postgres
def test_snipe_shared_store_satisfies_protocol(postgres_dsn):
assert issubclass(SnipeSharedStore, SharedTableProtocol)
@pytest.mark.postgres
def test_save_and_get_seller(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
seller = Seller(
platform="ebay",
platform_seller_id="test-seller-001",
username="testseller",
account_age_days=365,
feedback_count=100,
feedback_ratio=0.99,
category_history_json='{"electronics": 5}',
)
store.save_seller(seller)
result = store.get_seller("ebay", "test-seller-001")
assert result is not None
assert result.username == "testseller"
assert result.feedback_count == 100
store.delete_seller_data("ebay", "test-seller-001")
db.close()
@pytest.mark.postgres
def test_save_sellers_coalesce_preserves_age(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
seller_with_age = Seller(
platform="ebay", platform_seller_id="coalesce-test",
username="u", account_age_days=730,
feedback_count=50, feedback_ratio=0.95, category_history_json="{}",
)
store.save_seller(seller_with_age)
seller_without_age = Seller(
platform="ebay", platform_seller_id="coalesce-test",
username="u", account_age_days=None,
feedback_count=60, feedback_ratio=0.96, category_history_json="{}",
)
store.save_sellers([seller_without_age])
result = store.get_seller("ebay", "coalesce-test")
assert result.account_age_days == 730
assert result.feedback_count == 60
store.delete_seller_data("ebay", "coalesce-test")
db.close()
@pytest.mark.postgres
def test_market_comp_cache(postgres_dsn):
from datetime import datetime, timedelta, timezone
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
expires = (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat()
comp = MarketComp(
platform="ebay", query_hash="abc123",
median_price=49.99, sample_count=10, expires_at=expires,
)
store.save_market_comp(comp)
result = store.get_market_comp("ebay", "abc123")
assert result is not None
assert result.median_price == 49.99
db.close()
@pytest.mark.postgres
def test_reported_sellers(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.mark_reported("ebay", "bad-seller-99", username="badguy")
reported = store.list_reported("ebay")
assert "bad-seller-99" in reported
store.mark_reported("ebay", "bad-seller-99") # idempotent
db.close()
@pytest.mark.postgres
def test_clone_returns_self(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
store = SnipeSharedStore(db)
assert store.clone() is store
db.close()
@pytest.mark.postgres
def test_blocklist_add_get_remove(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
assert not store.is_blocklisted("ebay", "bad-999")
entry = store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="bad-999",
username="scammer1", reason="sold fakes", source="manual",
))
assert entry.id is not None
assert store.is_blocklisted("ebay", "bad-999")
entries = store.list_blocklist("ebay")
assert any(e.platform_seller_id == "bad-999" for e in entries)
store.remove_from_blocklist("ebay", "bad-999")
assert not store.is_blocklisted("ebay", "bad-999")
db.close()
@pytest.mark.postgres
def test_blocklist_upsert_is_idempotent(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller", reason="reason1", source="manual",
))
# Second add — should not raise, should update username but preserve reason via COALESCE
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller_updated", reason=None, source="community",
))
entries = [e for e in store.list_blocklist("ebay") if e.platform_seller_id == "dup-test"]
assert len(entries) == 1
assert entries[0].username == "seller_updated"
assert entries[0].reason == "reason1" # COALESCE preserved original reason
store.remove_from_blocklist("ebay", "dup-test")
db.close()

39
tests/db/test_protocol.py Normal file
View file

@ -0,0 +1,39 @@
"""Verify Store satisfies SharedTableProtocol at import time."""
from app.db.protocol import SharedTableProtocol
from app.db.store import Store
def test_store_satisfies_protocol():
assert issubclass(Store, SharedTableProtocol)
def test_store_clone_returns_new_instance(tmp_path):
db = tmp_path / "test.db"
s = Store(db)
clone = s.clone()
assert isinstance(clone, Store)
assert clone is not s
assert clone._db_path == db
def test_ebay_adapter_accepts_protocol():
from app.platforms.ebay.adapter import EbayAdapter
import tempfile
import pathlib
from unittest.mock import MagicMock
with tempfile.TemporaryDirectory() as tmp:
s = Store(pathlib.Path(tmp) / "t.db")
adapter = EbayAdapter(token_manager=MagicMock(), shared_store=s)
assert adapter._store is s
def test_scraped_adapter_no_db_path_ref():
from app.platforms.ebay.scraper import ScrapedEbayAdapter
import tempfile
import pathlib
with tempfile.TemporaryDirectory() as tmp:
s = Store(pathlib.Path(tmp) / "t.db")
adapter = ScrapedEbayAdapter(shared_store=s)
assert not hasattr(adapter, '_db_path_ref')

View file

@ -173,7 +173,7 @@ def _make_orch_client_mock(vision_json: str) -> MagicMock:
def test_run_task_photo_analysis_orch_success(tmp_db: Path):
"""Cloud path: CFOrchClient.task_allocate is used when CF_ORCH_URL is set."""
"""Cloud path: CFOrchClient.task_allocate is used when GPU_SERVER_URL is set."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
chat_resp = MagicMock()
@ -181,7 +181,7 @@ def test_run_task_photo_analysis_orch_success(tmp_db: Path):
chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
@ -216,7 +216,7 @@ def test_run_task_photo_analysis_orch_uses_image_assessment_task(tmp_db: Path):
chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
@ -248,7 +248,7 @@ def test_run_task_photo_analysis_orch_sends_image_url_content(tmp_db: Path):
return resp
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
@ -282,7 +282,7 @@ def test_run_task_photo_analysis_orch_task_not_found_falls_back(tmp_db: Path):
client_instance.task_allocate.return_value = cm
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \
patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local: