Compare commits

...

19 commits
v0.6.0 ... main

Author SHA1 Message Date
9bd6d9513e docs: add LLM development disclosure to README
Some checks failed
CI / Python tests (push) Has been cancelled
CI / Frontend typecheck + tests (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
Humans own design, architecture, code review, testing, and
verification. LLMs are part of our development workflow.
Links to circuitforge.tech/positions for our full position.
2026-05-28 08:20:17 -07:00
341d66d5f0 feat: migrate shared_db (sellers/market_comps/blocklist) from SQLite to Postgres (#45)
Three-layer migration: SQLite Store remains for per-user tables (listings,
trust_scores, background_tasks, community_signals). Postgres takes over
for all high-contention shared tables.

Closes: #45
2026-05-22 15:49:02 -07:00
e34c2b9982 feat(db): wire Postgres shared backend into main.py and extend protocol
SharedTableProtocol now covers the full shared-table surface:
  - sellers, market_comps, reported_sellers (already in SnipeSharedStore)
  - scammer_blocklist (new — is_blocklisted, add/remove/list_blocklist)
  - refresh_seller_categories (reads per-user SQLite, writes to Postgres)

TrustScorer updated to accept SharedTableProtocol (was Store).

api/main.py:
  - _pg_shared_store global + _make_shared_store(path) helper
  - Lifespan init: SNIPE_SHARED_DB_URL → SnipeSharedDB + SnipeSharedStore
  - All Store(shared_db) calls for shared tables replaced with
    _make_shared_store(shared_db) or shared_store.clone()
  - Blocklist endpoints use _make_shared_store (Postgres when configured)
  - Community signals stay SQLite-only (low-write, not in protocol)

Postgres migration 001: scammer_blocklist table added.
8 blocklist tests added (gated behind SNIPE_SHARED_DB_URL / @pytest.mark.postgres).
.env.example: SNIPE_SHARED_DB_URL documented.
compose.cloud.yml: GPU_SERVER_URL + SNIPE_SHARED_DB_URL comment added.

248 passed, 8 skipped (postgres-gated).

Closes: #45
2026-05-22 15:47:36 -07:00
cc997c09e3 refactor: rename CF_ORCH_URL → GPU_SERVER_URL (backward-compat alias kept)
GPU_SERVER_URL is the self-explanatory name a self-hoster can understand
without knowing CircuitForge internals. CF_ORCH_URL continues to work as
a drop-in fallback alias (runner.py, main.py both check GPU_SERVER_URL
first, then CF_ORCH_URL).

Updated everywhere the env var is referenced or documented:
- app/tasks/runner.py
- api/main.py
- app/llm/router.py
- .env.example (alias note added)
- compose.override.yml
- compose.cloud.yml
- config/llm.cloud.yaml
- tests/test_tasks/test_runner.py (primary key updated; 13/13 still pass)

Follows the GPU_SERVER_URL convention established in kiwi (see kiwi
app/core/config.py).

Closes: #55
2026-05-21 15:05:27 -07:00
c10a481ce3 chore: move circuitforge-orch to optional extras group
Free-tier users get a clean `pip install snipe` (or pip install -e .)
without hitting a resolution error for circuitforge-orch, which is not
on PyPI and is a Paid+ feature.

Runtime tier gate in runner.py / main.py already handles the missing-
package case gracefully (falls back to LLMRouter when GPU_SERVER_URL
is unset). Install-time gating was a violation of the CF MIT boundary.

Upgrade path: pip install snipe[orchestration]

Closes: #56
2026-05-21 15:05:11 -07:00
80ac13e69f refactor(adapters): accept SharedTableProtocol; replace thread-local Store pattern with clone() 2026-05-18 09:12:00 -07:00
9d8b627fe1 fix(db): remove redundant _snipe_shared_migrations DDL from SQL file (runner owns it) 2026-05-18 09:09:35 -07:00
1d6556072f feat(db): SnipeSharedStore — Postgres backend for sellers, market_comps, reported_sellers
Implements SharedTableProtocol against a ThreadedConnectionPool (psycopg2).
SnipeSharedDB handles pool lifecycle and idempotent SQL migrations.
save_sellers uses COALESCE to preserve existing account_age_days when the
new record omits it. All 6 Postgres tests skip cleanly without SNIPE_SHARED_DB_URL.
2026-05-18 09:07:32 -07:00
78809c761e feat(db): SharedTableProtocol + Store.clone() for dual-backend support 2026-05-18 08:53:47 -07:00
6fbcf90740 feat(db): Postgres schema for shared sellers, market_comps, reported_sellers 2026-05-18 08:31:11 -07:00
5ddfbece8e chore(deps): add psycopg2-binary for shared Postgres migration 2026-05-18 08:30:34 -07:00
4dd44fdafb docs: bump version badge to match latest Forgejo release
Some checks failed
CI / Frontend typecheck + tests (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
CI / Python tests (push) Has been cancelled
2026-05-17 11:19:13 -07:00
263c8522ee feat(tasks): migrate trust_photo_analysis to cf-orch image_assessment task endpoint (#43)
- _assess_via_orch(): uses CFOrchClient.task_allocate('snipe', 'image_assessment')
  with multimodal image_url payload; falls back to local LLMRouter on TaskNotFound
- _assess_via_local_llm(): lazy LLMRouter import, unchanged local path
- CF_ORCH_URL env var selects path at runtime; local users unaffected
- Added circuitforge-orch>=0.1.0 to main dependencies
- 5 new runner tests covering orch happy path, task tag, image_url payload format,
  TaskNotFound fallback, and non-JSON response handling (13 tests total, 244 suite)
2026-05-13 15:43:18 -07:00
1bf95bba2a feat(llm): migrate query_translator to cf-orch task endpoint for cloud, keep LLMRouter for local (#54)
QueryTranslator now supports two backends chosen at startup:
- CF_ORCH_URL set: allocate via POST /api/inference/task (product=snipe,
  task=query_translation), call the allocated cf-text service, release the
  slot in a finally block to guarantee the VRAM lease is freed.
- CF_ORCH_URL absent: existing LLMRouter path unchanged (ollama/vllm/api keys).

Also moves httpx from dev-only to main dependencies (already used by mcp/server.py).
2026-05-13 15:22:09 -07:00
ae0d4fbc89 docs(screenshots): add search results view showing trust scores, STEAL badges, and market price comparison
Some checks failed
CI / Python tests (push) Has been cancelled
CI / Frontend typecheck + tests (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
2026-05-06 10:19:38 -07:00
8ba07b9766 docs(screenshots): retake hero after CSS theme fix — consistent warm light theme throughout
Some checks are pending
CI / Python tests (push) Waiting to run
CI / Frontend typecheck + tests (push) Waiting to run
Mirror / mirror (push) Waiting to run
2026-05-06 09:58:39 -07:00
d7c8a8bca6 docs(readme): landing page rewrite — corrected tagline, hero screenshot, platform table, sniping engine roadmap, split license
Some checks are pending
CI / Python tests (push) Waiting to run
CI / Frontend typecheck + tests (push) Waiting to run
Mirror / mirror (push) Waiting to run
2026-05-06 08:51:37 -07:00
108f63b4f2 fix(browser-pool): replace queue with thread-local storage to fix Playwright cross-thread crash (#53)
Playwright's sync API binds its greenlet event loop to the creating thread.
Sharing pre-warmed slots across threads caused "cannot switch to a different
thread" panics under uvicorn. New design: each worker thread owns its own
Playwright instance created lazily on first fetch_html() call. A registry
dict keyed by thread-id lets stop() close all slots at shutdown. Removes
ThreadPoolExecutor warmup and idle-cleanup daemon thread entirely.
2026-05-04 09:27:20 -07:00
bccedb1fe5 fix(trust): treat feedback_ratio=0.0 as missing data for buyer-only/returning sellers (#52)
eBay omits the 12-month positive percentage for returning sellers and
buyer-only accounts with no recent sales. Previously ratio=0.0 with
count>0 triggered established_bad_actor; now it returns None from the
scorer (score_is_partial=True) and emits a soft no_recent_seller_data
flag instead. ratio=0.0 with count=0 is still treated as no-history.
2026-05-04 09:24:27 -07:00
32 changed files with 1699 additions and 682 deletions

View file

@ -98,16 +98,25 @@ CF_APP_NAME=snipe
# OLLAMA_HOST=http://localhost:11434 # OLLAMA_HOST=http://localhost:11434
# OLLAMA_MODEL=llava:7b # OLLAMA_MODEL=llava:7b
# CF Orchestrator — routes vision/LLM tasks to a cf-orch coordinator for VRAM management. # GPU Server — routes vision/LLM tasks to a cf-orch coordinator for VRAM management.
# Self-hosted: point at a local cf-orch coordinator if you have one running. # Self-hosted: point at a local cf-orch coordinator if you have one running.
# Cloud (internal): managed coordinator at orch.circuitforge.tech. # Cloud (internal): managed coordinator at orch.circuitforge.tech.
# Leave unset to run vision tasks inline (no VRAM coordination). # Leave unset to run vision tasks inline (no VRAM coordination).
# CF_ORCH_URL=http://10.1.10.71:7700 # GPU_SERVER_URL=http://10.1.10.71:7700
#
# CF_ORCH_URL is accepted as a backward-compat alias for GPU_SERVER_URL.
# #
# cf-orch agent (compose --profile orch) — coordinator URL for the sidecar agent. # cf-orch agent (compose --profile orch) — coordinator URL for the sidecar agent.
# Defaults to CF_ORCH_URL if unset. # Defaults to GPU_SERVER_URL if unset.
# CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700 # CF_ORCH_COORDINATOR_URL=http://10.1.10.71:7700
# ── Shared Postgres (optional — strongly recommended for cloud/multi-user) ────
# When set, sellers, market_comps, reported_sellers, and scammer_blocklist are
# stored in Postgres instead of SQLite. Required to avoid database-locked errors
# under concurrent load (>10 simultaneous search users).
# Cloud instances: set to the cf-postgres DSN. Self-hosted: leave unset for SQLite.
# SNIPE_SHARED_DB_URL=postgresql://snipe:<password>@localhost:5432/snipe_shared
# ── Community DB (optional) ────────────────────────────────────────────────── # ── Community DB (optional) ──────────────────────────────────────────────────
# When set, seller trust signals (confirmed scammers added to blocklist) are # When set, seller trust signals (confirmed scammers added to blocklist) are
# published to the shared community PostgreSQL for cross-user signal aggregation. # published to the shared community PostgreSQL for cross-user signal aggregation.

View file

@ -5,6 +5,7 @@ WORKDIR /app
# System deps for Playwright/Chromium # System deps for Playwright/Chromium
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
xvfb \ xvfb \
libpq-dev \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install circuitforge-core from sibling directory (compose sets context: ..) # Install circuitforge-core from sibling directory (compose sets context: ..)

364
README.md
View file

@ -1,29 +1,87 @@
# Snipe — Auction Sniping & Listing Intelligence <!-- Logo coming soon — replace docs/snipe-logo.svg when final icon ships -->
<div align="center">
<img src="docs/snipe-logo.svg" alt="Snipe logo" width="120" />
> *Part of the Circuit Forge LLC "AI for the tasks you hate most" suite.* # Snipe
**Status:** Active — eBay listing intelligence MVP complete; Mercari search + trust scoring live. Auction sniping engine and additional platforms are next. **Auction intelligence and sniping for people who don't trust the platform.**
**[Documentation](https://docs.circuitforge.tech/snipe/)** · [circuitforge.tech](https://circuitforge.tech) [![License: MIT / BSL 1.1](https://img.shields.io/badge/license-MIT%20%2F%20BSL%201.1-blue)](LICENSE)
[![Status: Beta](https://img.shields.io/badge/status-beta-yellow)]()
[![Version](https://img.shields.io/badge/version-0.5.1-green)](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/releases)
[![Forgejo](https://img.shields.io/badge/primary%20repo-Forgejo-orange)](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe)
[![Docs](https://img.shields.io/badge/docs-docs.circuitforge.tech%2Fsnipe-green)](https://docs.circuitforge.tech/snipe)
## Quick install (self-hosted) *Part of the Circuit Forge LLC suite — "AI for the tasks the system made hard on purpose."*
</div>
**Requirements:** Docker with Compose plugin, Git. No API keys needed to get started. ---
<table>
<tr>
<td><img src="docs/screenshots/hero.png" alt="Snipe search page with filter panel and feature overview"/></td>
<td><img src="docs/screenshots/results.png" alt="Search results — trust score badges, STEAL price flags, seller feedback, and market price comparison"/></td>
</tr>
</table>
---
## Why Snipe?
Auction platforms are designed to make you act fast and trust blindly. The closing countdown, the hidden price history, the new-account seller with one feedback — all of it is structured against the buyer.
Snipe inverts that. Before you place a bid, you get a trust score built from five independently sourced signals: seller account age, feedback volume, feedback ratio, price versus recent completed sales, and category history. A hard-coded red flag for new accounts or bad actors overrides the composite. Soft flags surface buried damage disclosures, duplicate photos, and listings that have been sitting unsold for weeks. When the listing is priced well below market, you see a STEAL badge — sourced from eBay Marketplace Insights, not from the seller's description.
The sniping engine — precise last-second bid submission with NTP (network time protocol) synchronization and soft-close handling — is next on the roadmap. The intelligence layer is live now.
---
## Features
### Listing intelligence (live)
- **Trust scoring** — five-signal composite score (0100) per listing: account age, feedback count, feedback ratio, price vs. market, category history
- **Red flag detection** — hard flags for new accounts and established bad actors; soft flags for damage keywords, evasive language, duplicate photos, long-on-market listings, and significant price drops
- **Price vs. market** — listing price compared against completed-sale medians via eBay Marketplace Insights API (Browse API fallback)
- **Keyword filtering** — must-include (AND / ANY / OR-groups), must-exclude, category, price range; OR-groups expand into multiple targeted queries so eBay relevance doesn't silently drop variants
- **Saved searches** — one-click re-run that restores all filter settings
- **Background enrichment** — seller account age scraped via Playwright + Xvfb (Kasada/Cloudflare-safe headed Chromium); on-demand re-score per listing without re-searching
- **LLM query builder** — describe what you want in plain language; an LLM builds the search terms (paid tier)
- **Vision photo assessment** — condition scoring from listing photos via moondream2 locally or Claude vision (paid/cloud); VRAM-aware scheduling via circuitforge-core task scheduler
- **Affiliate link builder** — eBay Partner Network wrapping with user BYOK support and per-retailer disclosure
### Platforms
| Platform | Search | Trust scoring | Completed-sale comps |
|----------|--------|---------------|----------------------|
| **eBay** | Browse API + Playwright fallback | All 5 signals | Marketplace Insights + Browse fallback |
| **Mercari** | Playwright scraper | 3/5 signals (partial) | Phase 3 |
| CT Bids, HiBid, AuctionZip, Invaluable, GovPlanet, Bidsquare, Proxibid | Planned | Planned | Planned |
### Auction sniping engine (roadmap)
- NTP-synchronized last-second bid submission
- Soft-close detection and strategy adjustment
- Proxy bid ladder with configurable max
- Human approval gate before any bid executes
- Post-win workflow: payment routing, shipping coordination, provenance documentation
---
## Quick Start
**Requirements:** Docker with Compose plugin, Git. No API keys required to get started.
```bash ```bash
# One-line install — clones to ~/snipe by default # One-line install — clones to ~/snipe by default
bash <(curl -fsSL https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/raw/branch/main/install.sh) bash <(curl -fsSL https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/raw/branch/main/install.sh)
# Or clone manually and run the script:
git clone https://git.opensourcesolarpunk.com/Circuit-Forge/snipe.git
bash snipe/install.sh
``` ```
Then open **http://localhost:8509**. Then open **http://localhost:8509**.
### Manual setup (if you prefer) ### Manual setup
Snipe's API image is built from a parent context that includes `circuitforge-core`. Both repos must sit as siblings in the same directory: Snipe's API image builds from a parent context that includes `circuitforge-core`. Both repos must sit as siblings:
``` ```
workspace/ workspace/
@ -36,286 +94,88 @@ mkdir snipe-workspace && cd snipe-workspace
git clone https://git.opensourcesolarpunk.com/Circuit-Forge/snipe.git git clone https://git.opensourcesolarpunk.com/Circuit-Forge/snipe.git
git clone https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core.git git clone https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core.git
cd snipe cd snipe
cp .env.example .env # edit if you have eBay API credentials (optional) cp .env.example .env # add eBay API credentials if you have them (optional)
./manage.sh start ./manage.sh start
``` ```
### Optional: eBay API credentials ### Optional: eBay API credentials
Snipe works without any credentials using its Playwright scraper fallback. Adding eBay API credentials unlocks faster searches and inline seller account age (no extra scrape needed): Snipe works without credentials using its Playwright scraper fallback. Adding credentials unlocks faster searches and inline seller account age without an extra scrape:
1. Register at [developer.ebay.com](https://developer.ebay.com/my/keys) 1. Register at [developer.ebay.com](https://developer.ebay.com/my/keys)
2. Copy your Production **App ID** and **Cert ID** into `.env` 2. Copy your Production **App ID** and **Cert ID** into `.env`
3. Restart: `./manage.sh restart` 3. `./manage.sh restart`
--- ---
## What it does ## Tiers
Snipe has two layers that work together: | Tier | What you get |
|------|-------------|
| **Free** | eBay + Mercari search, full trust scoring, keyword filtering, saved searches — local LLM only |
| **Paid** | LLM query builder, background saved-search monitoring with alerts, cloud LLM option |
| **Premium** | Vision photo condition assessment, fine-tuned trust models, multi-user |
| **Ultra** | Human-in-the-loop operator — handles CAPTCHAs, phone calls, anything automation can't |
**Layer 1 — Listing intelligence (MVP, implemented)** License key format: `CFG-SNPE-XXXX-XXXX-XXXX`
Before you bid, Snipe tells you whether a listing is worth your time. It fetches eBay listings, scores each seller's trustworthiness across five signals, flags suspicious pricing relative to completed sales, and surfaces red flags like new accounts, cosmetic damage buried in titles, and listings that have been sitting unsold for weeks.
**Layer 2 — Auction sniping (roadmap)**
Snipe manages the bid itself: monitors listings across platforms, schedules last-second bids, handles soft-close extensions, and guides you through the post-win logistics (payment routing, shipping coordination, provenance documentation for antiques).
The name is the origin of the word "sniping" — common snipes are notoriously elusive birds, secretive and camouflaged, that flush suddenly from cover. Shooting one required extreme patience, stillness, and a precise last-second shot. That's the auction strategy.
--- ---
## Screenshots
**Landing page — no account required**
![Snipe landing hero showing search bar and three feature tiles: Seller trust score, Price vs. market, Red flag detection](docs/screenshots/01-hero.png)
**Search results with trust scores**
![Search results for vintage film camera listings, each card showing a trust score badge, seller feedback, price, and market comparison](docs/screenshots/02-results.png)
**STEAL badge — price significantly below market**
![Listing cards with STEAL badge highlighting listings priced well below completed sales median](docs/screenshots/03-steal-badge.png)
> Red flag and Triple Red screenshots coming — captured opportunistically from real scammy listings.
---
## Implemented: Listing Intelligence
### Supported platforms
| Platform | Search | Trust scoring | Completed-sales comps |
|----------|--------|---------------|-----------------------|
| **eBay** | ✅ Browse API + Playwright fallback | ✅ All 5 signals | ✅ Marketplace Insights + Browse fallback |
| **Mercari** | ✅ Playwright scraper | ✅ Partial (3/5 signals) | ⏳ Phase 3 |
Switch between platforms via the tab picker in the search UI. All platforms share the same Playwright + Xvfb scraping stack (Cloudflare/Kasada-safe headed Chromium).
### eBay Listing Intelligence
### Search & filtering
- Full-text eBay search via Browse API (with Playwright scraper fallback when no API credentials configured)
- Price range, must-include keywords (AND / ANY / OR-groups mode), must-exclude terms, eBay category filter
- OR-group mode expands keyword combinations into multiple targeted queries and deduplicates results — eBay relevance won't silently drop variants
- Pages-to-fetch control: each Browse API page returns up to 200 listings
- Saved searches with one-click re-run that restores all filter settings
### Seller trust scoring
Five signals, each scored 020, composited to 0100:
| Signal | What it measures |
|--------|-----------------|
| `account_age` | Days since eBay account registration |
| `feedback_count` | Total feedback received |
| `feedback_ratio` | Positive feedback percentage |
| `price_vs_market` | Listing price vs. median of recent completed sales |
| `category_history` | Whether seller has history selling in this category |
Scores are marked **partial** when signals are unavailable (e.g. account age not yet enriched). Partial scores are displayed with a visual indicator rather than penalizing the seller for missing data.
### Red flags
Hard filters that override the composite score:
- `new_account` — account registered within 7 days
- `established_bad_actor` — feedback ratio < 80% with 20+ reviews
Soft flags surfaced as warnings:
- `account_under_30_days` — account under 30 days old
- `low_feedback_count` — fewer than 10 reviews
- `suspicious_price` — listing price below 50% of market median *(suppressed automatically when the search returns a heterogeneous price distribution — e.g. mixed laptop generations — to prevent false positives)*
- `duplicate_photo` — same image found on another listing (perceptual hash)
- `scratch_dent_mentioned` — title keywords indicating cosmetic damage, functional problems, or evasive language (see below)
- `long_on_market` — listing has been seen 5+ times over 14+ days without selling
- `significant_price_drop` — current price more than 20% below first-seen price
### Scratch & dent title detection
Scans listing titles for signals the item may have undisclosed damage or problems:
- **Explicit damage**: scratch, scuff, dent, crack, chip, blemish, worn
- **Condition catch-alls**: as is, for parts, parts only, spares or repair
- **Evasive redirects**: "see description", "read description", "see photos for" (seller hiding damage detail in listing body)
- **Functional problems**: "not working", "stopped working", "no power", "dead on arrival", "powers on but", "faulty", "broken screen/hinge/port"
- **DIY/repair listings**: "needs repair", "needs tlc", "project laptop", "for repair", "sold as is"
### Seller enrichment
- **Inline (API adapter)**: account age filled from Browse API `registrationDate` field
- **Background (scraper)**: `/itm/` listing pages scraped for seller "Joined" date via Playwright + Xvfb (Kasada-safe headed Chromium)
- **On-demand**: ↻ button on any listing card triggers `POST /api/enrich` — runs enrichment and re-scores without waiting for a second search
- **Category history**: derived from the seller's accumulated listing data (Browse API `categories` field); improves with every search, no extra API calls
### Affiliate link builder
Listing cards surface eBay affiliate-wrapped URLs. Uses `circuitforge_core.affiliates.wrap_url` — resolution order: user opted out → plain URL; user has BYOK affiliate ID → their ID; CF env var set (`EBAY_AFFILIATE_ID`) → CF's ID; otherwise plain URL. Users can configure their own eBay Partner Network ID or opt out entirely in Settings.
Disclosure tooltip appears on first encounter per-session and on each wrapped link (per-retailer copy from `get_disclosure_text`).
### Feedback FAB
In-app feedback button (bottom-right FAB) opens a modal: title, description, optional screenshot. Posts to the CF feedback endpoint. Status probed on load; FAB hidden if endpoint unreachable.
### Vision task scheduling
Photo condition assessment tasks queued through `circuitforge_core.tasks.TaskScheduler` — VRAM-aware slot management shared with any other LLM workloads on the same host. Runs moondream2 locally (free tier) or Claude vision (paid/cloud). Results stored per-listing and update the trust score card.
### Market price comparison
Completed sales fetched via eBay Marketplace Insights API (with Browse API fallback for app tiers that don't have Insights access). Median stored per query hash, used to score `price_vs_market` across all listings in a search.
### Adapters
| Adapter | When used | Signals available |
|---------|-----------|-------------------|
| Browse API (`api`) | eBay API credentials configured | All signals; account age inline |
| Playwright scraper (`scraper`) | No credentials / forced | All signals except account age (async BTF enrichment) |
| `auto` (default) | — | API if credentials present, scraper otherwise |
### Mercari Listing Intelligence
Search Mercari US via headed Chromium + playwright-stealth, bypassing Cloudflare Turnstile. Uses the same `BrowserPool` as the eBay scraper.
**Trust signal coverage:**
| Signal | Source | Available |
|--------|--------|-----------|
| `feedback_count` | `NumSales` on listing page | ✅ |
| `feedback_ratio` | `ReviewStarsWrapper[data-stars]` ÷ 5 | ✅ |
| `price_vs_market` | Computed from comps (Phase 3) | ⏳ |
| `account_age_days` | Seller profile page (not yet fetched) | ❌ |
| `category_history` | Not exposed in Mercari HTML | ❌ |
All Mercari scores are marked **partial** (`score_is_partial=True`) because account age and category history are unavailable. The trust scorer handles partial scores correctly — missing signals don't penalise the seller.
**Design note:** `seller_platform_id` stores the Mercari `product_id` (e.g. `m86032668393`) rather than the seller username, because seller identity isn't available from search results HTML. `get_seller()` resolves the product ID by fetching the individual listing page.
---
## Stack
| Layer | Tech | Port |
|-------|------|------|
| Frontend | Vue 3 + Pinia + UnoCSS + Vite (nginx) | 8509 |
| API | FastAPI (uvicorn) | 8510 |
| Scraper | Playwright + playwright-stealth + Xvfb | — |
| DB | SQLite (`data/snipe.db`) | — |
| Core | circuitforge-core (editable install) | — |
## Running ## Running
```bash ```bash
./manage.sh start # start all services ./manage.sh start # start all services
./manage.sh stop # stop ./manage.sh stop # stop
./manage.sh restart # restart
./manage.sh logs # tail logs ./manage.sh logs # tail logs
./manage.sh open # open in browser ./manage.sh open # open in browser
``` ```
Cloud stack (shared DB, multi-user): ---
```bash
docker compose -f compose.cloud.yml -p snipe-cloud up -d ## Stack
docker compose -f compose.cloud.yml -p snipe-cloud build api # after Python changes
``` | Layer | Technology | Port |
|-------|-----------|------|
| Frontend | Vue 3 + Pinia + UnoCSS + Vite (served via nginx) | 8509 |
| API | FastAPI (uvicorn) | 8510 |
| Scraper | Playwright + playwright-stealth + Xvfb (Kasada/Cloudflare-safe headed Chromium) | — |
| Database | SQLite (`data/snipe.db`) | — |
| Core | circuitforge-core (editable install) | — |
The scraper stack uses headed Chromium via Xvfb (X virtual framebuffer) with playwright-stealth for all platform access. Headless and `requests`-based approaches are blocked by eBay and Mercari.
--- ---
## Roadmap ## Documentation
### Intelligence features Full documentation at **[docs.circuitforge.tech/snipe](https://docs.circuitforge.tech/snipe)** — setup guide, trust scoring algorithm, platform adapter reference, API docs, and self-hosting notes.
| Issue | Feature |
|-------|---------|
| [#5](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/5) | UPC/product lookup → LLM-crafted search terms (paid tier) |
| [#12](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/12) | Background saved-search monitoring with configurable alerts |
| [#21](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/21) | Vision classification pipeline — condition scoring, listing quality, fraud signals |
| [#43](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/43) | Wire photo analysis task to cf-orch (VRAM-aware scheduling) |
| [#51](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/51) | Reranker: semantic filter before trust scoring |
| [#52](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/52) | Trust score fix: exclude buyer-only feedback from `feedback_count` |
| [#41](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/41) | Additional theme variants — solarized, high-contrast, colorblind-safe |
### Platform expansion
| Issue | Feature |
|-------|---------|
| ✅ shipped | Mercari US — search + partial trust scoring |
| [#53](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/53) | BrowserPool thread-safety — eliminate per-request cold-start (~10s) |
| [#10](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/10) | CT Bids, HiBid, AuctionZip, Invaluable, GovPlanet, Bidsquare, Proxibid |
| [#46](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/46) | Broadcast trust score verdicts to Fediverse communities via ActivityPub |
### Cloud / infrastructure
| Issue | Feature |
|-------|---------|
| [#7](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/7) | Shared image hash DB — requires explicit opt-in consent (CF privacy-by-architecture) |
| [#45](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/45) | Migrate shared seller/comps DB from SQLite to Postgres |
### Auction sniping engine
| Issue | Feature |
|-------|---------|
| [#9](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/9) | Bid scheduling + snipe execution (NTP-synchronized, soft-close handling, human approval gate) |
| [#13](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/13) | Post-win workflow: payment routing, shipping coordination, provenance documentation |
### Already shipped
| Issue | Feature |
|-------|---------|
| [#1](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/1) | SSE live score push — enriched data appears without re-search |
| [#2](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/2) | eBay OAuth for full trust score access via Trading API |
| [#4](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/4) | Community blocklist + batch eBay Trust & Safety reporting |
| [#6](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/6) | Shared seller/scammer/comps DB across cloud users |
| [#8](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/8) | "Triple Red" easter egg |
| [#11](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/11) | Vision-based photo condition assessment — moondream2 / Claude vision |
| [#27](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/27) | MCP server for Snipe search and scoring |
| [#29](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/29) | LLM query builder — describe what to find, AI builds the search |
| [#47](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/47) | Browser pool — pre-warm Chromium to cut scrape cold-start |
| [#48](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/48) | Search result caching — skip redundant scrapes for repeated queries |
| [#49](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/49) | Async search endpoint — return job ID immediately, scrape in background |
| [#50](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/issues/50) | Currency preference — display prices in user's preferred currency |
--- ---
## Primary platforms (full vision) ## Forgejo-primary
- **eBay** — general + collectibles *(search + trust scoring: implemented)* Snipe is developed and maintained on Forgejo at [git.opensourcesolarpunk.com/Circuit-Forge/snipe](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe). GitHub and Codeberg are read-only mirrors. File issues and submit pull requests on Forgejo.
- **Mercari** — US resale marketplace *(search + partial trust scoring: implemented; comps Phase 3)*
- **CT Bids** — Connecticut state surplus and municipal auctions
- **GovPlanet / IronPlanet** — government surplus equipment
- **AuctionZip** — antique auction house aggregator (1,000+ houses)
- **Invaluable / LiveAuctioneers** — fine art and antiques
- **Bidsquare** — antiques and collectibles
- **HiBid** — estate auctions
- **Proxibid** — industrial and collector auctions
## Why auctions are hard ---
Online auctions are frustrating because: ## Contributing
- Winning requires being present at the exact closing moment — sometimes 2 AM
- Platforms vary wildly: some allow proxy bids, some don't; closing times extend on activity
- Scammers exploit auction urgency — new accounts, stolen photos, pressure to pay outside platform
- Price history is hidden — you don't know if an item is underpriced or a trap
- Sellers hide damage in descriptions rather than titles to avoid automated filters
- Shipping logistics for large / fragile antiques require coordination with the auction house
- Provenance documentation is inconsistent across auction houses
## Bidding strategy engine (planned) Bug reports and feature requests: open an issue on Forgejo. The discovery pipeline (scrapers, adapters, signal extraction) is MIT-licensed — pull requests welcome. AI trust-scoring features are BSL 1.1 — contributions are accepted but the license terms apply.
- **Hard snipe**: submit bid N seconds before close (default: 8s) ---
- **Soft-close handling**: detect if platform extends on last-minute bids; adjust strategy
- **Proxy ladder**: set max and let the engine bid in increments, reserve snipe for final window
- **Reserve detection**: identify likely reserve price from bid history patterns
- **Comparable sales**: pull recent auction results for same/similar items across platforms
## Post-win workflow (planned) ## License
1. Payment method routing (platform-specific: CC, wire, check) Snipe uses a dual license:
2. Shipping quote requests to approved carriers (freight / large items via uShip; parcel via FedEx/UPS)
3. Condition report request from auction house
4. Provenance packet generation (for antiques / fine art resale or insurance)
5. Add to inventory (for dealers / collectors tracking portfolio value)
## Product code (license key) | Component | License |
|-----------|---------|
| Discovery pipeline — scrapers, platform adapters, search, keyword filtering | [MIT](LICENSE-MIT) |
| LLM trust-scoring, query builder, vision assessment, AI features | [BSL 1.1](LICENSE-BSL) — free for personal non-commercial self-hosting; commercial use requires a paid license; converts to MIT after 4 years |
`CFG-SNPE-XXXX-XXXX-XXXX` Humans own design, architecture, code review, testing, and verification. LLMs are part of our development workflow. [Our positions on LLM use →](https://circuitforge.tech/positions)
## Tech notes Privacy · Safety · Accessibility — co-equal, non-negotiable.
- Shared `circuitforge-core` scaffold (DB, LLM router, tier system, config) [circuitforge.tech](https://circuitforge.tech)
- Platform adapters: eBay (Browse API + scraper) and Mercari (scraper); AuctionZip, Invaluable, HiBid, CT Bids planned (Playwright + API where available)
- Bid execution: Playwright automation with precise timing (NTP-synchronized)
- Soft-close detection: platform-specific rules engine
- Comparable sales: eBay completed listings via Marketplace Insights API + Browse API fallback
- Vision module: condition assessment from listing photos — moondream2 / Claude vision (paid tier stub in `app/trust/photo.py`)
- **Kasada/Cloudflare bypass**: headed Chromium via Xvfb with playwright-stealth; all scraping uses this path — headless and `requests`-based approaches are blocked by eBay and Mercari. Xvfb started with `-ac` (no X11 auth required in Docker), display range `:200+` to avoid host socket conflicts.

View file

@ -33,6 +33,7 @@ from api.cloud_session import CloudUser, compute_features, get_session
from api.ebay_webhook import router as ebay_webhook_router from api.ebay_webhook import router as ebay_webhook_router
from app.db.models import SavedSearch as SavedSearchModel from app.db.models import SavedSearch as SavedSearchModel
from app.db.models import ScammerEntry from app.db.models import ScammerEntry
from app.db.protocol import SharedTableProtocol
from app.db.store import Store from app.db.store import Store
from app.platforms import SUPPORTED_PLATFORMS, SearchFilters from app.platforms import SUPPORTED_PLATFORMS, SearchFilters
from app.platforms.ebay.adapter import EbayAdapter from app.platforms.ebay.adapter import EbayAdapter
@ -142,6 +143,19 @@ def _get_community_store() -> "SnipeCommunityStore | None":
return _community_store return _community_store
# ── Shared Postgres backend (optional — active when SNIPE_SHARED_DB_URL is set) ─
# Replaces the SQLite shared.db for sellers, market_comps, reported_sellers, and
# scammer_blocklist. ThreadedConnectionPool is thread-safe; one instance per process.
_pg_shared_store: "SharedTableProtocol | None" = None
def _make_shared_store(path: Path) -> SharedTableProtocol:
"""Return the active shared backend — Postgres if configured, SQLite otherwise."""
if _pg_shared_store is not None:
return _pg_shared_store
return Store(path)
# ── LLM Query Builder singletons (optional — requires LLM backend) ──────────── # ── LLM Query Builder singletons (optional — requires LLM backend) ────────────
_category_cache = None _category_cache = None
_query_translator = None _query_translator = None
@ -153,7 +167,7 @@ def _get_query_translator():
@asynccontextmanager @asynccontextmanager
async def _lifespan(app: FastAPI): async def _lifespan(app: FastAPI):
global _community_store global _community_store, _pg_shared_store
# Pre-warm the Chromium browser pool so the first scrape request does not # Pre-warm the Chromium browser pool so the first scrape request does not
# pay the full cold-start cost (5-10s Xvfb + browser launch). # pay the full cold-start cost (5-10s Xvfb + browser launch).
# Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2). # Pool size is controlled via BROWSER_POOL_SIZE env var (default: 2).
@ -178,6 +192,21 @@ async def _lifespan(app: FastAPI):
get_scheduler(sched_db) get_scheduler(sched_db)
log.info("Snipe task scheduler started (db=%s)", sched_db) log.info("Snipe task scheduler started (db=%s)", sched_db)
# Shared Postgres backend — optional. Replaces SQLite for sellers, market_comps,
# reported_sellers, and scammer_blocklist under concurrent load.
snipe_shared_dsn = os.environ.get("SNIPE_SHARED_DB_URL", "")
if snipe_shared_dsn:
try:
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore as _SnipeSharedStore
_pg_db = SnipeSharedDB(snipe_shared_dsn)
_pg_db.run_migrations()
_pg_shared_store = _SnipeSharedStore(_pg_db)
log.info("Shared Postgres backend ready (sellers, market_comps, blocklist)")
except Exception:
log.exception(
"SNIPE_SHARED_DB_URL set but Postgres init failed — falling back to SQLite"
)
# Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset. # Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset.
community_db_url = os.environ.get("COMMUNITY_DB_URL", "") community_db_url = os.environ.get("COMMUNITY_DB_URL", "")
if community_db_url: if community_db_url:
@ -209,13 +238,21 @@ async def _lifespan(app: FastAPI):
_category_cache.refresh(token_manager=None) # bootstrap fallback _category_cache.refresh(token_manager=None) # bootstrap fallback
try: try:
cforch_url = os.getenv("GPU_SERVER_URL") or os.getenv("CF_ORCH_URL") or None
if cforch_url:
_query_translator = QueryTranslator(
category_cache=_category_cache,
cforch_url=cforch_url,
)
log.info("LLM query builder ready (cf-orch).")
else:
from app.llm.router import LLMRouter from app.llm.router import LLMRouter
_llm_router = LLMRouter() _llm_router = LLMRouter()
_query_translator = QueryTranslator( _query_translator = QueryTranslator(
category_cache=_category_cache, category_cache=_category_cache,
llm_router=_llm_router, llm_router=_llm_router,
) )
log.info("LLM query builder ready.") log.info("LLM query builder ready (local LLM).")
except Exception: except Exception:
log.info("No LLM backend configured — query builder disabled.") log.info("No LLM backend configured — query builder disabled.")
except Exception: except Exception:
@ -438,7 +475,7 @@ def session_info(response: Response, session: CloudUser = Depends(get_session)):
def _trigger_scraper_enrichment( def _trigger_scraper_enrichment(
listings: list, listings: list,
shared_store: Store, shared_store: SharedTableProtocol,
shared_db: Path, shared_db: Path,
user_db: Path | None = None, user_db: Path | None = None,
query: str = "", query: str = "",
@ -504,7 +541,7 @@ def _trigger_scraper_enrichment(
if not session_id or session_id not in _update_queues: if not session_id or session_id not in _update_queues:
return return
q = _update_queues[session_id] q = _update_queues[session_id]
thread_shared = Store(shared_db) thread_shared = shared_store.clone()
thread_user = Store(user_db or shared_db) thread_user = Store(user_db or shared_db)
scorer = TrustScorer(thread_shared) scorer = TrustScorer(thread_shared)
comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest()) comp = thread_shared.get_market_comp("ebay", hashlib.md5(query.encode()).hexdigest())
@ -530,7 +567,7 @@ def _trigger_scraper_enrichment(
def _run(): def _run():
try: try:
enricher = ScrapedEbayAdapter(Store(shared_db)) enricher = ScrapedEbayAdapter(shared_store.clone())
if needs_btf: if needs_btf:
enricher.enrich_sellers_btf(needs_btf, max_workers=2) enricher.enrich_sellers_btf(needs_btf, max_workers=2)
log.info("BTF enrichment complete for %d sellers", len(needs_btf)) log.info("BTF enrichment complete for %d sellers", len(needs_btf))
@ -804,7 +841,7 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue() _update_queues[session_id] = _queue.SimpleQueue()
try: try:
shared_store = Store(shared_db) shared_store = _make_shared_store(shared_db)
user_store = Store(user_db) user_store = Store(user_db)
# Re-hydrate Listing dataclass instances from the cached dicts so the # Re-hydrate Listing dataclass instances from the cached dicts so the
@ -889,13 +926,14 @@ def search(
_evict_expired_cache() _evict_expired_cache()
log.info("cache: miss key=%s q=%r", cache_key, q) log.info("cache: miss key=%s q=%r", cache_key, q)
# Each thread creates its own Store — sqlite3 check_same_thread=True. # Each thread creates its own store via clone() — sqlite3 check_same_thread=True;
# SnipeSharedStore.clone() returns self (ThreadedConnectionPool is thread-safe).
def _run_search(ebay_query: str) -> list: def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(shared_db), adapter, platform=platform).search(ebay_query, base_filters) return _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None: def _run_comps() -> None:
try: try:
_make_adapter(Store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) _make_adapter(_make_shared_store(shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception: except Exception:
log.warning("comps: unhandled exception for %r", comp_query, exc_info=True) log.warning("comps: unhandled exception for %r", comp_query, exc_info=True)
@ -936,10 +974,9 @@ def search(
_update_queues[session_id] = _queue.SimpleQueue() _update_queues[session_id] = _queue.SimpleQueue()
try: try:
# Main-thread stores — fresh connections, same thread. # Main-thread stores — shared_store may be Postgres (sellers, market_comps);
# shared_store: sellers, market_comps (all users share this data) # user_store is always per-user SQLite (listings, trust_scores, saved_searches).
# user_store: listings, saved_searches (per-user in cloud mode, same file in local mode) shared_store = _make_shared_store(shared_db)
shared_store = Store(shared_db)
user_store = Store(user_db) user_store = Store(user_db)
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1199,7 +1236,7 @@ def search_async(
cached_listings_raw = payload["listings"] cached_listings_raw = payload["listings"]
cached_market_price = payload["market_price"] cached_market_price = payload["market_price"]
try: try:
shared_store = Store(_shared_db) shared_store = _make_shared_store(_shared_db)
user_store = Store(_user_db) user_store = Store(_user_db)
listings = [_Listing(**d) for d in cached_listings_raw] listings = [_Listing(**d) for d in cached_listings_raw]
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1279,11 +1316,11 @@ def search_async(
try: try:
def _run_search(ebay_query: str) -> list: def _run_search(ebay_query: str) -> list:
return _make_adapter(Store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters) return _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).search(ebay_query, base_filters)
def _run_comps() -> None: def _run_comps() -> None:
try: try:
_make_adapter(Store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages) _make_adapter(_make_shared_store(_shared_db), adapter, platform=platform).get_completed_sales(comp_query, pages)
except Exception: except Exception:
log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True) log.warning("async comps: unhandled exception for %r", comp_query, exc_info=True)
@ -1306,7 +1343,7 @@ def search_async(
platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm, platform, _auth_label(_user_id), _tier, adapter_used, pages, len(listings), q_norm,
) )
shared_store = Store(_shared_db) shared_store = _make_shared_store(_shared_db)
user_store = Store(_user_db) user_store = Store(_user_db)
user_store.save_listings(listings) user_store.save_listings(listings)
@ -1465,7 +1502,7 @@ def enrich_seller(
""" """
import threading import threading
shared_store = Store(session.shared_db) shared_store = _make_shared_store(session.shared_db)
user_store = Store(session.user_db) user_store = Store(session.user_db)
shared_db = session.shared_db shared_db = session.shared_db
@ -1494,7 +1531,7 @@ def enrich_seller(
def _btf(): def _btf():
try: try:
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_btf( ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_btf(
{seller: listing_id}, max_workers=1 {seller: listing_id}, max_workers=1
) )
except Exception as e: except Exception as e:
@ -1502,7 +1539,7 @@ def enrich_seller(
def _ssn(): def _ssn():
try: try:
ScrapedEbayAdapter(Store(shared_db)).enrich_sellers_categories( ScrapedEbayAdapter(shared_store.clone()).enrich_sellers_categories(
[seller], max_workers=1 [seller], max_workers=1
) )
except Exception as e: except Exception as e:
@ -1773,7 +1810,7 @@ class BlocklistAdd(BaseModel):
@app.get("/api/blocklist") @app.get("/api/blocklist")
def list_blocklist(session: CloudUser = Depends(get_session)): def list_blocklist(session: CloudUser = Depends(get_session)):
store = Store(session.shared_db) store = _make_shared_store(session.shared_db)
return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]} return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]}
@ -1784,7 +1821,7 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
status_code=403, status_code=403,
detail="Sign in to report sellers to the community blocklist.", detail="Sign in to report sellers to the community blocklist.",
) )
store = Store(session.shared_db) store = _make_shared_store(session.shared_db)
entry = store.add_to_blocklist(ScammerEntry( entry = store.add_to_blocklist(ScammerEntry(
platform=body.platform, platform=body.platform,
platform_seller_id=body.platform_seller_id, platform_seller_id=body.platform_seller_id,
@ -1818,13 +1855,13 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
@app.delete("/api/blocklist/{platform_seller_id}", status_code=204) @app.delete("/api/blocklist/{platform_seller_id}", status_code=204)
def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)): def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)):
Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id) _make_shared_store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
@app.get("/api/blocklist/export") @app.get("/api/blocklist/export")
def export_blocklist(session: CloudUser = Depends(get_session)): def export_blocklist(session: CloudUser = Depends(get_session)):
"""Download the blocklist as a CSV file.""" """Download the blocklist as a CSV file."""
entries = Store(session.shared_db).list_blocklist() entries = _make_shared_store(session.shared_db).list_blocklist()
buf = io.StringIO() buf = io.StringIO()
writer = csv.writer(buf) writer = csv.writer(buf)
writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"]) writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"])
@ -1856,7 +1893,7 @@ async def import_blocklist(
except UnicodeDecodeError: except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File must be UTF-8 encoded") raise HTTPException(status_code=400, detail="File must be UTF-8 encoded")
store = Store(session.shared_db) store = _make_shared_store(session.shared_db)
imported = 0 imported = 0
errors: list[str] = [] errors: list[str] = []
reader = csv.DictReader(io.StringIO(text)) reader = csv.DictReader(io.StringIO(text))
@ -2005,7 +2042,7 @@ async def build_search_query(
if translator is None: if translator is None:
raise HTTPException( raise HTTPException(
status_code=503, status_code=503,
detail="No LLM backend configured. Set OLLAMA_HOST, ANTHROPIC_API_KEY, or OPENAI_API_KEY.", detail="No LLM backend configured. Set CF_ORCH_URL (cloud) or OLLAMA_HOST / ANTHROPIC_API_KEY / OPENAI_API_KEY (local).",
) )
from app.llm.query_translator import QueryTranslatorError from app.llm.query_translator import QueryTranslatorError

View file

@ -0,0 +1,49 @@
-- Snipe shared tables: sellers, market_comps, reported_sellers
-- Replaces the equivalent tables in shared.db (SQLite).
-- Per-user tables (listings, trust_scores, saved_searches) remain in SQLite.
CREATE TABLE IF NOT EXISTS sellers (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
account_age_days INTEGER,
feedback_count INTEGER NOT NULL DEFAULT 0,
feedback_ratio DOUBLE PRECISION NOT NULL DEFAULT 0,
category_history_json TEXT NOT NULL DEFAULT '{}',
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS market_comps (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
query_hash TEXT NOT NULL,
median_price DOUBLE PRECISION NOT NULL,
sample_count INTEGER NOT NULL,
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
UNIQUE (platform, query_hash)
);
CREATE TABLE IF NOT EXISTS reported_sellers (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT,
reported_by TEXT NOT NULL DEFAULT 'user',
reported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);
CREATE TABLE IF NOT EXISTS scammer_blocklist (
id BIGSERIAL PRIMARY KEY,
platform TEXT NOT NULL,
platform_seller_id TEXT NOT NULL,
username TEXT NOT NULL,
reason TEXT,
source TEXT NOT NULL DEFAULT 'manual',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (platform, platform_seller_id)
);

View file

380
app/db/pg_shared.py Normal file
View file

@ -0,0 +1,380 @@
from __future__ import annotations
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from app.db.models import MarketComp, ScammerEntry, Seller
log = logging.getLogger(__name__)
_MIN_CONN = 2
_MAX_CONN = 20
class SnipeSharedDB:
"""Thread-safe Postgres connection pool for Snipe shared tables."""
def __init__(self, dsn: str) -> None:
self._pool = ThreadedConnectionPool(_MIN_CONN, _MAX_CONN, dsn=dsn)
def getconn(self):
return self._pool.getconn()
def putconn(self, conn) -> None:
self._pool.putconn(conn)
def close(self) -> None:
self._pool.closeall()
def run_migrations(self) -> None:
"""Apply pg_migrations/*.sql in filename order. Idempotent."""
migrations_dir = Path(__file__).parent / "pg_migrations"
files = sorted(migrations_dir.glob("*.sql"), key=lambda p: p.name)
conn = self.getconn()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS _snipe_shared_migrations (
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
conn.commit()
for f in files:
cur.execute(
"SELECT 1 FROM _snipe_shared_migrations WHERE filename = %s",
(f.name,),
)
if cur.fetchone():
continue
log.info("Applying migration: %s", f.name)
cur.execute(f.read_text())
cur.execute(
"INSERT INTO _snipe_shared_migrations (filename) VALUES (%s)",
(f.name,),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self.putconn(conn)
class SnipeSharedStore:
"""Postgres-backed store for sellers, market_comps, and reported_sellers.
Satisfies SharedTableProtocol. clone() returns self ThreadedConnectionPool
is already thread-safe, so no new instance is needed per thread.
"""
def __init__(self, db: SnipeSharedDB) -> None:
self._db = db
def clone(self) -> "SnipeSharedStore":
return self
# Sellers
def save_seller(self, seller: Seller) -> None:
self.save_sellers([seller])
def save_sellers(self, sellers: list[Seller]) -> None:
if not sellers:
return
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.executemany(
"""
INSERT INTO sellers
(platform, platform_seller_id, username, account_age_days,
feedback_count, feedback_ratio, category_history_json)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
feedback_count = EXCLUDED.feedback_count,
feedback_ratio = EXCLUDED.feedback_ratio,
account_age_days = COALESCE(
EXCLUDED.account_age_days,
sellers.account_age_days
),
category_history_json = COALESCE(
NULLIF(NULLIF(EXCLUDED.category_history_json, '{}'), ''),
NULLIF(NULLIF(sellers.category_history_json, '{}'), ''),
'{}'
),
fetched_at = NOW()
""",
[
(s.platform, s.platform_seller_id, s.username, s.account_age_days,
s.feedback_count, s.feedback_ratio, s.category_history_json or "{}")
for s in sellers
],
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, account_age_days,
feedback_count, feedback_ratio, category_history_json,
id, fetched_at
FROM sellers
WHERE platform = %s AND platform_seller_id = %s
""",
(platform, platform_seller_id),
)
row = cur.fetchone()
if not row:
return None
return Seller(*row[:7], id=row[7], fetched_at=str(row[8]))
finally:
self._db.putconn(conn)
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM sellers WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
# MarketComps
def save_market_comp(self, comp: MarketComp) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO market_comps
(platform, query_hash, median_price, sample_count, expires_at)
VALUES (%s, %s, %s, %s, %s::TIMESTAMPTZ)
ON CONFLICT (platform, query_hash) DO UPDATE SET
median_price = EXCLUDED.median_price,
sample_count = EXCLUDED.sample_count,
expires_at = EXCLUDED.expires_at,
fetched_at = NOW()
""",
(comp.platform, comp.query_hash, comp.median_price,
comp.sample_count, comp.expires_at),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
now = datetime.now(timezone.utc).isoformat()
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, query_hash, median_price, sample_count,
expires_at, id, fetched_at
FROM market_comps
WHERE platform = %s AND query_hash = %s AND expires_at > %s::TIMESTAMPTZ
""",
(platform, query_hash, now),
)
row = cur.fetchone()
if not row:
return None
return MarketComp(*row[:5], id=row[5], fetched_at=str(row[6]))
finally:
self._db.putconn(conn)
# Reported Sellers
def mark_reported(
self,
platform: str,
platform_seller_id: str,
username: Optional[str] = None,
reported_by: str = "user",
) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO reported_sellers
(platform, platform_seller_id, username, reported_by)
VALUES (%s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO NOTHING
""",
(platform, platform_seller_id, username, reported_by),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_reported(self, platform: str = "ebay") -> list[str]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT platform_seller_id FROM reported_sellers WHERE platform = %s",
(platform,),
)
return [row[0] for row in cur.fetchall()]
finally:
self._db.putconn(conn)
# Seller Category Refresh
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store=None, # always a SQLite Store in practice
) -> int:
"""Derive category_history_json from listing data and update sellers in Postgres.
listing_store must be provided (it's always the per-user SQLite Store).
Returns count of sellers updated.
"""
from app.platforms.ebay.scraper import _classify_category_label # lazy to avoid circular
import json
if not seller_ids or listing_store is None:
return 0
updated = 0
for sid in seller_ids:
seller = self.get_seller(platform, sid)
if not seller or seller.category_history_json not in ("{}", "", None):
continue
# listing_store is always a SQLite Store; access _conn directly for the query.
rows = listing_store._conn.execute(
"SELECT category_name, COUNT(*) FROM listings "
"WHERE platform=? AND seller_platform_id=? AND category_name IS NOT NULL "
"GROUP BY category_name",
(platform, sid),
).fetchall()
if not rows:
continue
counts: dict[str, int] = {}
for cat_name, cnt in rows:
key = _classify_category_label(cat_name)
if key:
counts[key] = counts.get(key, 0) + cnt
if counts:
from dataclasses import replace
self.save_sellers([replace(seller, category_history_json=json.dumps(counts))])
updated += 1
return updated
# Scammer Blocklist
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s LIMIT 1",
(platform, platform_seller_id),
)
return cur.fetchone() is not None
finally:
self._db.putconn(conn)
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO scammer_blocklist
(platform, platform_seller_id, username, reason, source)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (platform, platform_seller_id) DO UPDATE SET
username = EXCLUDED.username,
reason = COALESCE(EXCLUDED.reason, scammer_blocklist.reason),
source = EXCLUDED.source
""",
(entry.platform, entry.platform_seller_id, entry.username,
entry.reason, entry.source),
)
conn.commit()
cur.execute(
"SELECT id, created_at FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(entry.platform, entry.platform_seller_id),
)
row = cur.fetchone()
from dataclasses import replace
return replace(entry, id=row[0], created_at=str(row[1]))
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM scammer_blocklist "
"WHERE platform = %s AND platform_seller_id = %s",
(platform, platform_seller_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._db.putconn(conn)
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
conn = self._db.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT platform, platform_seller_id, username, reason, source, id, created_at
FROM scammer_blocklist
WHERE platform = %s
ORDER BY created_at DESC
""",
(platform,),
)
return [
ScammerEntry(
platform=r[0], platform_seller_id=r[1], username=r[2],
reason=r[3], source=r[4], id=r[5], created_at=str(r[6]),
)
for r in cur.fetchall()
]
finally:
self._db.putconn(conn)

86
app/db/protocol.py Normal file
View file

@ -0,0 +1,86 @@
"""Protocol (duck-type interface) for shared table backends (SQLite and Postgres)."""
from __future__ import annotations
from typing import Any, Optional, Protocol, runtime_checkable
from app.db.models import MarketComp, ScammerEntry, Seller
@runtime_checkable
class SharedTableProtocol(Protocol):
"""Protocol that both Store (SQLite) and SnipeSharedStore (Postgres) must satisfy.
This enables code that reads/writes shared tables (sellers, market_comps,
reported_sellers, scammer_blocklist) to remain agnostic to the underlying backend.
"""
def save_seller(self, seller: Seller) -> None:
"""Persist a single seller record."""
...
def save_sellers(self, sellers: list[Seller]) -> None:
"""Persist multiple seller records (batch upsert)."""
...
def get_seller(self, platform: str, platform_seller_id: str) -> Optional[Seller]:
"""Fetch a single seller by platform and platform_seller_id."""
...
def save_market_comp(self, comp: MarketComp) -> None:
"""Persist a market comparison record."""
...
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
"""Fetch a market comparison by platform and query_hash."""
...
def mark_reported(
self,
platform: str,
platform_seller_id: str,
username: Optional[str] = None,
reported_by: str = "user",
) -> None:
"""Record that a seller has been reported to the platform."""
...
def list_reported(self, platform: str = "ebay") -> list[str]:
"""Return all platform_seller_ids that have been reported."""
...
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:
"""Permanently erase a seller and all related data (GDPR/eBay compliance)."""
...
def refresh_seller_categories(
self,
platform: str,
seller_ids: list[str],
listing_store: Optional[Any] = None,
) -> int:
"""Derive category_history_json for sellers that lack it from stored listings.
listing_store: Store holding listings (may differ from self in split-DB mode).
Returns count of sellers updated.
"""
...
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
"""Return True if a seller is on the community scammer blocklist."""
...
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
"""Upsert a seller into the blocklist. Returns the saved entry with id and created_at."""
...
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
"""Remove a seller from the blocklist."""
...
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
"""Return all blocklisted sellers for a platform, newest first."""
...
def clone(self) -> SharedTableProtocol:
"""Create a new independent instance pointing to the same backend."""
...

View file

@ -21,6 +21,10 @@ class Store:
# WAL mode: allows concurrent readers + one writer without blocking # WAL mode: allows concurrent readers + one writer without blocking
self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA journal_mode=WAL")
def clone(self) -> Store:
"""Create a new independent instance pointing to the same database."""
return Store(self._db_path)
# --- Seller --- # --- Seller ---
def delete_seller_data(self, platform: str, platform_seller_id: str) -> None: def delete_seller_data(self, platform: str, platform_seller_id: str) -> None:

View file

@ -2,9 +2,15 @@
# BSL 1.1 License # BSL 1.1 License
"""LLM query builder — translates natural language to eBay SearchFilters. """LLM query builder — translates natural language to eBay SearchFilters.
The QueryTranslator calls LLMRouter.complete() (synchronous) with a domain-aware Supports two backends, selected at construction time:
system prompt. The prompt includes category hints injected from EbayCategoryCache.
The LLM returns a single JSON object matching SearchParamsResponse. cforch_url cf-orch task endpoint (cloud/premium). The coordinator resolves
product+task to a model and returns an allocation. The caller
POSTs to the allocated service URL, then DELETEs the allocation.
llm_router circuitforge_core.LLMRouter (local installs: ollama/vllm/api keys).
Exactly one of cforch_url or llm_router must be supplied.
""" """
from __future__ import annotations from __future__ import annotations
@ -13,6 +19,8 @@ import logging
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional
import httpx
if TYPE_CHECKING: if TYPE_CHECKING:
from app.platforms.ebay.categories import EbayCategoryCache from app.platforms.ebay.categories import EbayCategoryCache
@ -128,11 +136,23 @@ class QueryTranslator:
Args: Args:
category_cache: An EbayCategoryCache instance (may have empty cache). category_cache: An EbayCategoryCache instance (may have empty cache).
llm_router: An LLMRouter instance from circuitforge_core. cforch_url: cf-orch coordinator base URL (cloud/premium path).
llm_router: A circuitforge_core LLMRouter instance (local path).
Exactly one of cforch_url or llm_router must be provided.
""" """
def __init__(self, category_cache: "EbayCategoryCache", llm_router: object) -> None: def __init__(
self,
category_cache: "EbayCategoryCache",
*,
cforch_url: str | None = None,
llm_router: object | None = None,
) -> None:
if cforch_url is None and llm_router is None:
raise ValueError("Either cforch_url or llm_router must be provided")
self._cache = category_cache self._cache = category_cache
self._cforch_url = cforch_url
self._llm_router = llm_router self._llm_router = llm_router
def translate(self, natural_language: str) -> SearchParamsResponse: def translate(self, natural_language: str) -> SearchParamsResponse:
@ -154,14 +174,58 @@ class QueryTranslator:
system_prompt = _SYSTEM_PROMPT_TEMPLATE.format(category_hints=category_hints) system_prompt = _SYSTEM_PROMPT_TEMPLATE.format(category_hints=category_hints)
try: try:
raw = self._llm_router.complete( if self._cforch_url:
natural_language, raw = self._call_orch(system_prompt, natural_language)
system=system_prompt, else:
max_tokens=512, raw = self._call_local(system_prompt, natural_language)
) except QueryTranslatorError:
raise
except Exception as exc: except Exception as exc:
raise QueryTranslatorError( raise QueryTranslatorError(
f"LLM backend error: {exc}", raw="" f"LLM backend error: {exc}", raw=""
) from exc ) from exc
return _parse_response(raw) return _parse_response(raw)
def _call_orch(self, system_prompt: str, user_message: str) -> str:
"""Allocate via cf-orch task endpoint, call the model, release the slot."""
alloc_resp = httpx.post(
f"{self._cforch_url}/api/inference/task",
json={"product": "snipe", "task": "query_translation"},
timeout=10.0,
)
alloc_resp.raise_for_status()
alloc = alloc_resp.json()
service_url = alloc["url"]
allocation_id = alloc["allocation_id"]
try:
resp = httpx.post(
f"{service_url}/v1/chat/completions",
json={
"model": "__auto__",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"max_tokens": 512,
},
timeout=60.0,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
finally:
try:
httpx.delete(
f"{self._cforch_url}/api/services/cf-text/allocations/{allocation_id}",
timeout=5.0,
)
except Exception:
log.warning("Failed to release cf-orch allocation %s", allocation_id)
def _call_local(self, system_prompt: str, user_message: str) -> str:
"""Call the locally-configured LLMRouter (ollama/vllm/api keys)."""
return self._llm_router.complete( # type: ignore[union-attr]
user_message,
system=system_prompt,
max_tokens=512,
)

View file

@ -6,7 +6,7 @@ Snipe LLMRouter shim — tri-level config path priority.
Config lookup order: Config lookup order:
1. <repo>/config/llm.yaml per-install local override 1. <repo>/config/llm.yaml per-install local override
2. ~/.config/circuitforge/llm.yaml user-level config (circuitforge-core default) 2. ~/.config/circuitforge/llm.yaml user-level config (circuitforge-core default)
3. env-var auto-config (ANTHROPIC_API_KEY, OPENAI_API_KEY, OLLAMA_HOST, CF_ORCH_URL) 3. env-var auto-config (ANTHROPIC_API_KEY, OPENAI_API_KEY, OLLAMA_HOST, GPU_SERVER_URL)
""" """
from pathlib import Path from pathlib import Path

View file

@ -22,7 +22,7 @@ _SHOPPING_API_INTER_REQUEST_DELAY = 0.5 # seconds between successive calls
_SELLER_ENRICH_TTL_HOURS = 24 # skip re-enrichment within this window _SELLER_ENRICH_TTL_HOURS = 24 # skip re-enrichment within this window
from app.db.models import Listing, MarketComp, Seller from app.db.models import Listing, MarketComp, Seller
from app.db.store import Store from app.db.protocol import SharedTableProtocol
from app.platforms import PlatformAdapter, SearchFilters from app.platforms import PlatformAdapter, SearchFilters
from app.platforms.ebay.auth import EbayTokenManager from app.platforms.ebay.auth import EbayTokenManager
from app.platforms.ebay.normaliser import normalise_listing, normalise_seller from app.platforms.ebay.normaliser import normalise_listing, normalise_seller
@ -67,7 +67,7 @@ BROWSE_BASE = {
class EbayAdapter(PlatformAdapter): class EbayAdapter(PlatformAdapter):
def __init__(self, token_manager: EbayTokenManager, shared_store: Store, env: str = "production"): def __init__(self, token_manager: EbayTokenManager, shared_store: SharedTableProtocol, env: str = "production"):
self._tokens = token_manager self._tokens = token_manager
self._store = shared_store self._store = shared_store
self._env = env self._env = env

View file

@ -1,60 +1,58 @@
"""Pre-warmed Chromium browser pool for the eBay scraper. """Thread-local Playwright browser manager for the eBay scraper.
Eliminates cold-start latency (5-10s per call) by keeping a small pool of Each uvicorn worker thread that calls fetch_html() gets its own Playwright
long-lived Playwright browser instances with fresh contexts ready to serve. instance, browser, and context created lazily on first use. This avoids
the "cannot switch to a different thread" error that arises when Playwright
sync API instances are shared across threads (they bind their greenlet event
loop to the creating thread).
Key design: Key design:
- Pool slots: ``(xvfb_proc, pw_instance, browser, context, display_num, last_used_ts)`` - Thread-local: _thread_local.slot holds the _PooledBrowser for the current
One headed Chromium browser per slot keeps the Kasada fingerprint clean. thread. No slot is ever handed to another thread.
- Display numbering: :200-:399 (avoids host :0 and low-numbered kernel socket conflicts). - Lazy creation: slots are created on first fetch_html() call per thread, not
- Thread safety: ``queue.Queue`` with blocking get (timeout=3s before fresh fallback). at startup. start() is a lightweight lifecycle marker only.
- Replenishment: after each use, the dirty context is closed and a new context is - Registry: _slot_registry (keyed by thread-id) lets stop() close every active
opened on the *same* browser, then returned to the queue. Browser launch overhead slot across all threads without walking thread-local storage.
is only paid at startup and during idle-cleanup replenishment. - Replenishment: after each use the dirty context is closed and a fresh one
- Idle cleanup: daemon thread closes slots idle for >5 minutes to avoid memory leaks opened on the same browser. Browser launch overhead is paid at most once
when the service is quiet. per worker thread lifetime.
- Graceful degradation: if Playwright / Xvfb is unavailable (host-side test env), - Graceful degradation: if Playwright / Xvfb is unavailable, fetch_html falls
``fetch_html`` falls back to launching a fresh browser per call same behavior back to _fetch_fresh (identical behavior to before this module existed).
as before this module existed.
Pool size is controlled via ``BROWSER_POOL_SIZE`` env var (default: 2). Pool size is read from BROWSER_POOL_SIZE env var (default: 2) but is now a
soft limit used only for documentation; actual concurrency is bounded by
uvicorn's thread count.
""" """
from __future__ import annotations from __future__ import annotations
import itertools import itertools
import logging import logging
import os import os
import queue
import subprocess import subprocess
import threading import threading
import time import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Optional from typing import Optional
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# Display counter shared by pool warmup and _fetch_fresh fallback.
# Range :200-:399 avoids low-numbered displays that may be pre-occupied by
# the host X server or lingering kernel sockets from previous runs.
_pool_display_counter = itertools.cycle(range(200, 400)) _pool_display_counter = itertools.cycle(range(200, 400))
_IDLE_TIMEOUT_SECS = 300 # 5 minutes
_CLEANUP_INTERVAL_SECS = 60
_QUEUE_TIMEOUT_SECS = 3.0
_CHROMIUM_ARGS = ["--no-sandbox", "--disable-dev-shm-usage"] _CHROMIUM_ARGS = ["--no-sandbox", "--disable-dev-shm-usage"]
_XVFB_ARGS = ["-screen", "0", "1280x800x24", "-ac"] # -ac: disable X auth (safe in isolated Docker) _XVFB_ARGS = ["-screen", "0", "1280x800x24", "-ac"]
_USER_AGENT = ( _USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
) )
_VIEWPORT = {"width": 1280, "height": 800} _VIEWPORT = {"width": 1280, "height": 800}
# Thread-local storage: each thread gets its own _PooledBrowser slot.
_thread_local = threading.local()
@dataclass @dataclass
class _PooledBrowser: class _PooledBrowser:
"""One slot in the browser pool.""" """One browser slot, bound to a single thread."""
xvfb: subprocess.Popen xvfb: subprocess.Popen
pw: object # playwright instance (sync_playwright().__enter__()) pw: object # playwright instance (sync_playwright().__enter__())
browser: object # playwright Browser browser: object # playwright Browser
@ -63,13 +61,13 @@ class _PooledBrowser:
last_used_ts: float = field(default_factory=time.time) last_used_ts: float = field(default_factory=time.time)
def _launch_slot() -> "_PooledBrowser": def _launch_slot() -> _PooledBrowser:
"""Launch a new Xvfb display + headed Chromium browser + fresh context. """Launch a new Xvfb display + headed Chromium browser + fresh context.
Raises on failure callers must catch and handle gracefully. Must be called from the thread that will use the slot.
""" """
from playwright.sync_api import sync_playwright from playwright.sync_api import sync_playwright
from playwright_stealth import Stealth # noqa: F401 — imported here to confirm availability from playwright_stealth import Stealth # noqa: F401
display_num = next(_pool_display_counter) display_num = next(_pool_display_counter)
display = f":{display_num}" display = f":{display_num}"
@ -81,7 +79,6 @@ def _launch_slot() -> "_PooledBrowser":
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
) )
# Small grace period for Xvfb to bind the display socket.
time.sleep(0.3) time.sleep(0.3)
pw = sync_playwright().start() pw = sync_playwright().start()
@ -112,7 +109,7 @@ def _launch_slot() -> "_PooledBrowser":
def _close_slot(slot: _PooledBrowser) -> None: def _close_slot(slot: _PooledBrowser) -> None:
"""Cleanly close a pool slot: context → browser → Playwright → Xvfb.""" """Cleanly close a slot: context → browser → Playwright → Xvfb."""
try: try:
slot.ctx.close() slot.ctx.close()
except Exception: except Exception:
@ -133,11 +130,7 @@ def _close_slot(slot: _PooledBrowser) -> None:
def _replenish_slot(slot: _PooledBrowser) -> _PooledBrowser: def _replenish_slot(slot: _PooledBrowser) -> _PooledBrowser:
"""Close the used context and open a fresh one on the same browser. """Close the used context and open a fresh one on the same browser."""
Returns a new _PooledBrowser sharing the same xvfb/pw/browser but with a
clean context avoids paying browser launch overhead on every fetch.
"""
try: try:
slot.ctx.close() slot.ctx.close()
except Exception: except Exception:
@ -158,26 +151,27 @@ def _replenish_slot(slot: _PooledBrowser) -> _PooledBrowser:
class BrowserPool: class BrowserPool:
"""Thread-safe pool of pre-warmed Playwright browser contexts.""" """Thread-local Playwright browser manager.
Each thread that calls fetch_html() owns its own browser instance.
No slots are shared between threads.
"""
def __init__(self, size: int = 2) -> None: def __init__(self, size: int = 2) -> None:
self._size = size self._size = size
self._q: queue.Queue[_PooledBrowser] = queue.Queue()
self._lock = threading.Lock() self._lock = threading.Lock()
self._started = False self._started = False
self._stopped = False self._stopped = False
self._playwright_available: Optional[bool] = None # cached after first check self._playwright_available: Optional[bool] = None
# Registry of all active slots keyed by thread id — used only by stop().
self._slot_registry: dict[int, _PooledBrowser] = {}
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Lifecycle # Lifecycle
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def start(self) -> None: def start(self) -> None:
"""Pre-warm N browser slots in background threads. """Mark the pool as started. Slots are created lazily per thread."""
Non-blocking: returns immediately; slots appear in the queue as they
finish launching. Safe to call multiple times (no-op after first).
"""
with self._lock: with self._lock:
if self._started: if self._started:
return return
@ -190,43 +184,19 @@ class BrowserPool:
) )
return return
def _warm_one(_: int) -> None: log.info("BrowserPool: started (thread-local mode, size hint=%d)", self._size)
try:
slot = _launch_slot()
self._q.put(slot)
log.debug("BrowserPool: slot :%d ready", slot.display_num)
except Exception as exc:
log.warning("BrowserPool: pre-warm failed: %s", exc)
with ThreadPoolExecutor(max_workers=self._size) as ex:
futures = [ex.submit(_warm_one, i) for i in range(self._size)]
# Don't wait — executor exits after submitting, threads continue.
# Actually ThreadPoolExecutor.__exit__ waits for completion, which
# is fine: pre-warming completes in background relative to FastAPI
# startup because this whole method is called from a thread.
for f in as_completed(futures):
pass # propagate exceptions via logging, not raises
_idle_cleaner = threading.Thread(
target=self._idle_cleanup_loop, daemon=True, name="browser-pool-idle-cleaner"
)
_idle_cleaner.start()
log.info("BrowserPool: started with %d slots", self._q.qsize())
def stop(self) -> None: def stop(self) -> None:
"""Drain and close all pool slots. Called at FastAPI shutdown.""" """Close all active slots across all threads."""
with self._lock: with self._lock:
self._stopped = True self._stopped = True
registry_snapshot = dict(self._slot_registry)
closed = 0 closed = 0
while True: for slot in registry_snapshot.values():
try:
slot = self._q.get_nowait()
_close_slot(slot) _close_slot(slot)
closed += 1 closed += 1
except queue.Empty: self._slot_registry.clear()
break
log.info("BrowserPool: stopped, closed %d slot(s)", closed) log.info("BrowserPool: stopped, closed %d slot(s)", closed)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@ -242,28 +212,13 @@ class BrowserPool:
) -> str: ) -> str:
"""Navigate to *url* and return the rendered HTML. """Navigate to *url* and return the rendered HTML.
Borrows a browser context from the pool (blocks up to 3s), uses it to Uses the calling thread's browser slot (creates one if needed).
fetch the page, then replenishes the slot with a fresh context. Falls back to a fresh browser if Playwright is unavailable or the
slot fails.
Falls back to a fully fresh browser if the pool is empty after the
timeout or if Playwright is unavailable.
Args:
wait_for_selector: CSS/data-testid selector to wait for before capturing
HTML (e.g. ``"[data-testid='SearchResults']"``). When set, the fixed
*wait_for_timeout_ms* sleep is skipped the page is captured as soon
as the selector appears (or after 15s timeout, whichever comes first).
wait_for_timeout_ms: static post-navigation sleep in ms when
*wait_for_selector* is None. Default 2000; set higher (e.g. 8000)
for sites with JS challenge pages (Cloudflare Turnstile).
""" """
time.sleep(delay) time.sleep(delay)
slot: Optional[_PooledBrowser] = None slot = self._get_or_create_thread_slot()
try:
slot = self._q.get(timeout=_QUEUE_TIMEOUT_SECS)
except queue.Empty:
log.debug("BrowserPool: pool empty after %.1fs — using fresh browser", _QUEUE_TIMEOUT_SECS)
if slot is not None: if slot is not None:
try: try:
@ -272,32 +227,65 @@ class BrowserPool:
wait_for_selector=wait_for_selector, wait_for_selector=wait_for_selector,
wait_for_timeout_ms=wait_for_timeout_ms, wait_for_timeout_ms=wait_for_timeout_ms,
) )
# Replenish: close dirty context, open fresh one, return to queue.
try: try:
fresh_slot = _replenish_slot(slot) fresh_slot = _replenish_slot(slot)
self._q.put(fresh_slot) self._register_slot(fresh_slot)
except Exception as exc: except Exception as exc:
log.warning("BrowserPool: replenish failed, slot discarded: %s", exc) log.warning("BrowserPool: replenish failed, slot discarded: %s", exc)
_close_slot(slot) _close_slot(slot)
self._unregister_slot()
return html return html
except Exception as exc: except Exception as exc:
log.warning("BrowserPool: pooled fetch failed (%s) — closing slot", exc) log.warning("BrowserPool: pooled fetch failed (%s) — closing slot", exc)
_close_slot(slot) _close_slot(slot)
# Fall through to fresh browser below. self._unregister_slot()
# Fallback: fresh browser (same code as old scraper._fetch_url).
return self._fetch_fresh( return self._fetch_fresh(
url, url,
wait_for_selector=wait_for_selector, wait_for_selector=wait_for_selector,
wait_for_timeout_ms=wait_for_timeout_ms, wait_for_timeout_ms=wait_for_timeout_ms,
) )
# ------------------------------------------------------------------
# Thread-local slot management
# ------------------------------------------------------------------
def _get_or_create_thread_slot(self) -> Optional[_PooledBrowser]:
"""Return the calling thread's slot, creating it if absent."""
if not self._check_playwright():
return None
slot: Optional[_PooledBrowser] = getattr(_thread_local, "slot", None)
if slot is not None:
return slot
try:
slot = _launch_slot()
self._register_slot(slot)
log.debug("BrowserPool: launched slot :%d for thread %d",
slot.display_num, threading.get_ident())
return slot
except Exception as exc:
log.warning("BrowserPool: slot launch failed: %s", exc)
return None
def _register_slot(self, slot: _PooledBrowser) -> None:
"""Bind slot to the calling thread (both thread-local and registry)."""
_thread_local.slot = slot
with self._lock:
self._slot_registry[threading.get_ident()] = slot
def _unregister_slot(self) -> None:
"""Remove the calling thread's slot from thread-local and registry."""
_thread_local.slot = None
with self._lock:
self._slot_registry.pop(threading.get_ident(), None)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal helpers # Internal helpers
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _check_playwright(self) -> bool: def _check_playwright(self) -> bool:
"""Return True if Playwright and Xvfb are importable/runnable."""
if self._playwright_available is not None: if self._playwright_available is not None:
return self._playwright_available return self._playwright_available
try: try:
@ -315,7 +303,6 @@ class BrowserPool:
wait_for_selector: Optional[str] = None, wait_for_selector: Optional[str] = None,
wait_for_timeout_ms: int = 2000, wait_for_timeout_ms: int = 2000,
) -> str: ) -> str:
"""Open a new page on *slot.ctx*, navigate to *url*, return HTML."""
from playwright_stealth import Stealth from playwright_stealth import Stealth
page = slot.ctx.new_page() page = slot.ctx.new_page()
@ -326,7 +313,7 @@ class BrowserPool:
try: try:
page.wait_for_selector(wait_for_selector, timeout=15_000) page.wait_for_selector(wait_for_selector, timeout=15_000)
except Exception: except Exception:
pass # selector didn't appear; return whatever loaded pass
else: else:
page.wait_for_timeout(wait_for_timeout_ms) page.wait_for_timeout(wait_for_timeout_ms)
return page.content() return page.content()
@ -342,7 +329,6 @@ class BrowserPool:
wait_for_selector: Optional[str] = None, wait_for_selector: Optional[str] = None,
wait_for_timeout_ms: int = 2000, wait_for_timeout_ms: int = 2000,
) -> str: ) -> str:
"""Launch a fully fresh browser, fetch *url*, close everything."""
import subprocess as _subprocess import subprocess as _subprocess
try: try:
@ -364,7 +350,7 @@ class BrowserPool:
stdout=_subprocess.DEVNULL, stdout=_subprocess.DEVNULL,
stderr=_subprocess.DEVNULL, stderr=_subprocess.DEVNULL,
) )
time.sleep(0.3) # wait for Xvfb to bind the display socket before Chromium starts time.sleep(0.3)
try: try:
with sync_playwright() as pw: with sync_playwright() as pw:
browser = pw.chromium.launch( browser = pw.chromium.launch(
@ -383,7 +369,7 @@ class BrowserPool:
try: try:
page.wait_for_selector(wait_for_selector, timeout=15_000) page.wait_for_selector(wait_for_selector, timeout=15_000)
except Exception: except Exception:
pass # selector didn't appear; return whatever loaded pass
else: else:
page.wait_for_timeout(wait_for_timeout_ms) page.wait_for_timeout(wait_for_timeout_ms)
html = page.content() html = page.content()
@ -394,32 +380,6 @@ class BrowserPool:
return html return html
def _idle_cleanup_loop(self) -> None:
"""Daemon thread: drain slots idle for >5 minutes every 60 seconds."""
while not self._stopped:
time.sleep(_CLEANUP_INTERVAL_SECS)
if self._stopped:
break
now = time.time()
idle_cutoff = now - _IDLE_TIMEOUT_SECS
# Drain the entire queue, keep non-idle slots, close idle ones.
kept: list[_PooledBrowser] = []
closed = 0
while True:
try:
slot = self._q.get_nowait()
except queue.Empty:
break
if slot.last_used_ts < idle_cutoff:
_close_slot(slot)
closed += 1
else:
kept.append(slot)
for slot in kept:
self._q.put(slot)
if closed:
log.info("BrowserPool: idle cleanup closed %d slot(s)", closed)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Module-level singleton # Module-level singleton
@ -430,11 +390,7 @@ _pool_lock = threading.Lock()
def get_pool() -> BrowserPool: def get_pool() -> BrowserPool:
"""Return the module-level BrowserPool singleton (creates it if needed). """Return the module-level BrowserPool singleton (creates it if needed)."""
Pool size is read from ``BROWSER_POOL_SIZE`` env var (default: 2).
Call ``get_pool().start()`` at FastAPI startup to pre-warm slots.
"""
global _pool global _pool
if _pool is None: if _pool is None:
with _pool_lock: with _pool_lock:

View file

@ -25,7 +25,7 @@ log = logging.getLogger(__name__)
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from app.db.models import Listing, MarketComp, Seller from app.db.models import Listing, MarketComp, Seller
from app.db.store import Store from app.db.protocol import SharedTableProtocol
from app.platforms import PlatformAdapter, SearchFilters from app.platforms import PlatformAdapter, SearchFilters
EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html" EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html"
@ -286,7 +286,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
category_history) cause TrustScorer to set score_is_partial=True. category_history) cause TrustScorer to set score_is_partial=True.
""" """
def __init__(self, shared_store: Store, delay: float = 1.0): def __init__(self, shared_store: SharedTableProtocol, delay: float = 1.0):
self._store = shared_store self._store = shared_store
self._delay = delay self._delay = delay
@ -374,8 +374,6 @@ class ScrapedEbayAdapter(PlatformAdapter):
Does not raise failures per-seller are silently skipped so the main Does not raise failures per-seller are silently skipped so the main
search response is never blocked. search response is never blocked.
""" """
db_path = self._store._db_path # capture for thread-local Store creation
def _enrich_one(item: tuple[str, str]) -> None: def _enrich_one(item: tuple[str, str]) -> None:
seller_id, listing_id = item seller_id, listing_id = item
try: try:
@ -388,7 +386,7 @@ class ScrapedEbayAdapter(PlatformAdapter):
) )
if age_days is None and fb_count is None: if age_days is None and fb_count is None:
return # nothing new to write return # nothing new to write
thread_store = Store(db_path) thread_store = self._store.clone()
seller = thread_store.get_seller("ebay", seller_id) seller = thread_store.get_seller("ebay", seller_id)
if not seller: if not seller:
log.warning("BTF enrich: seller %s not found in DB", seller_id) log.warning("BTF enrich: seller %s not found in DB", seller_id)

View file

@ -7,28 +7,30 @@ Current task types:
trust_photo_analysis download primary photo, run vision LLM, write trust_photo_analysis download primary photo, run vision LLM, write
result to trust_scores.photo_analysis_json (Paid tier). result to trust_scores.photo_analysis_json (Paid tier).
Prompt note: The vision prompt is a functional first pass. Tune against real Image assessment routing:
eBay listings before GA specifically stock-photo vs genuine-product distinction Cloud (GPU_SERVER_URL set): allocates via cf-orch task endpoint
and the damage vocabulary. product=snipe, task=image_assessment.
Local (no GPU_SERVER_URL) or TaskNotFound fallback: uses LLMRouter
with a vision-capable local backend (moondream2, llava, etc.).
""" """
from __future__ import annotations from __future__ import annotations
import base64 import base64
import json import json
import logging import logging
import os
from pathlib import Path from pathlib import Path
import httpx
import requests import requests
from circuitforge_core.db import get_connection from circuitforge_core.db import get_connection
from circuitforge_core.llm import LLMRouter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"}) LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"})
VRAM_BUDGETS: dict[str, float] = { VRAM_BUDGETS: dict[str, float] = {
# moondream2 / vision-capable LLM — single image, short response "trust_photo_analysis": 6000, # Q5_K_M Qwen2-VL via cf-orch; LLMRouter fallback uses 2.0 GB
"trust_photo_analysis": 2.0,
} }
_VISION_SYSTEM_PROMPT = ( _VISION_SYSTEM_PROMPT = (
@ -51,8 +53,7 @@ def insert_task(
) -> tuple[int, bool]: ) -> tuple[int, bool]:
"""Insert a background task if no identical task is already in-flight. """Insert a background task if no identical task is already in-flight.
Uses get_connection() so WAL mode and timeout=30 apply same as all other Returns (task_id, is_new).
Snipe DB access. Returns (task_id, is_new).
""" """
conn = get_connection(db_path) conn = get_connection(db_path)
conn.row_factory = __import__("sqlite3").Row conn.row_factory = __import__("sqlite3").Row
@ -120,32 +121,26 @@ def _run_trust_photo_analysis(
p = json.loads(params or "{}") p = json.loads(params or "{}")
photo_url = p.get("photo_url", "") photo_url = p.get("photo_url", "")
listing_title = p.get("listing_title", "") listing_title = p.get("listing_title", "")
# user_db: per-user DB in cloud mode; same as db_path in local mode.
result_db = Path(p.get("user_db", str(db_path))) result_db = Path(p.get("user_db", str(db_path)))
if not photo_url: if not photo_url:
raise ValueError("trust_photo_analysis: 'photo_url' is required in params") raise ValueError("trust_photo_analysis: 'photo_url' is required in params")
# Download and base64-encode the photo
resp = requests.get(photo_url, timeout=10) resp = requests.get(photo_url, timeout=10)
resp.raise_for_status() resp.raise_for_status()
image_b64 = base64.b64encode(resp.content).decode() image_b64 = base64.b64encode(resp.content).decode()
image_data_url = f"data:image/jpeg;base64,{image_b64}"
# Build user prompt with optional title context user_prompt = "Assess this listing image."
user_prompt = "Evaluate this eBay listing photo."
if listing_title: if listing_title:
user_prompt = f"Evaluate this eBay listing photo for: {listing_title}" user_prompt = f"Assess this eBay listing image: {listing_title}"
# Call LLMRouter with vision capability cforch_url = os.getenv("GPU_SERVER_URL") or os.getenv("CF_ORCH_URL")
router = LLMRouter() if cforch_url:
raw = router.complete( raw = _assess_via_orch(cforch_url, image_data_url, user_prompt)
user_prompt, else:
system=_VISION_SYSTEM_PROMPT, raw = _assess_via_local_llm(image_b64, user_prompt)
images=[image_b64],
max_tokens=128,
)
# Parse — be lenient: strip markdown fences if present
try: try:
cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
analysis = json.loads(cleaned) analysis = json.loads(cleaned)
@ -168,3 +163,54 @@ def _run_trust_photo_analysis(
analysis.get("visible_damage"), analysis.get("visible_damage"),
analysis.get("confidence"), analysis.get("confidence"),
) )
def _assess_via_orch(cforch_url: str, image_data_url: str, user_prompt: str) -> str:
"""Run photo assessment via cf-orch task endpoint (cloud path)."""
from circuitforge_orch.client import CFOrchClient, TaskNotFound
client = CFOrchClient(cforch_url)
try:
with client.task_allocate("snipe", "image_assessment") as alloc:
resp = httpx.post(
f"{alloc.url}/v1/chat/completions",
json={
"model": alloc.model or "__auto__",
"messages": [
{
"role": "system",
"content": _VISION_SYSTEM_PROMPT,
},
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_data_url}},
{"type": "text", "text": user_prompt},
],
},
],
"max_tokens": 128,
},
timeout=60.0,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except TaskNotFound:
log.warning(
"snipe.image_assessment not registered in cf-orch — falling back to local LLM"
)
image_b64 = image_data_url.split(",", 1)[1]
return _assess_via_local_llm(image_b64, user_prompt)
def _assess_via_local_llm(image_b64: str, user_prompt: str) -> str:
"""Run photo assessment via local LLMRouter (local/self-hosted path)."""
from app.llm.router import LLMRouter
router = LLMRouter()
return router.complete(
user_prompt,
system=_VISION_SYSTEM_PROMPT,
images=[image_b64],
max_tokens=128,
)

View file

@ -2,7 +2,7 @@ import hashlib
import math import math
from app.db.models import Listing, TrustScore from app.db.models import Listing, TrustScore
from app.db.store import Store from app.db.protocol import SharedTableProtocol
from .aggregator import Aggregator from .aggregator import Aggregator
from .metadata import MetadataScorer from .metadata import MetadataScorer
@ -12,7 +12,7 @@ from .photo import PhotoScorer
class TrustScorer: class TrustScorer:
"""Orchestrates metadata + photo scoring for a batch of listings.""" """Orchestrates metadata + photo scoring for a batch of listings."""
def __init__(self, shared_store: Store): def __init__(self, shared_store: SharedTableProtocol):
self._store = shared_store self._store = shared_store
self._meta = MetadataScorer() self._meta = MetadataScorer()
self._photo = PhotoScorer() self._photo = PhotoScorer()

View file

@ -126,7 +126,12 @@ class Aggregator:
# Hard filters # Hard filters
if seller and seller.account_age_days is not None and seller.account_age_days < HARD_FILTER_AGE_DAYS: if seller and seller.account_age_days is not None and seller.account_age_days < HARD_FILTER_AGE_DAYS:
red_flags.append("new_account") red_flags.append("new_account")
if seller and seller.feedback_ratio < HARD_FILTER_BAD_RATIO_THRESHOLD: if seller and seller.feedback_ratio == 0.0 and seller.feedback_count > 0:
# 12-month ratio missing from page — returning seller or buyer-only account.
# Score will be partial (metadata._feedback_ratio returns None). Soft flag
# only: do NOT fire established_bad_actor on what is likely missing data.
red_flags.append("no_recent_seller_data")
elif seller and seller.feedback_ratio < HARD_FILTER_BAD_RATIO_THRESHOLD:
if HARD_FILTER_BAD_RATIO_MIN_COUNT < seller.feedback_count <= HARD_FILTER_BAD_RATIO_MAX_COUNT: if HARD_FILTER_BAD_RATIO_MIN_COUNT < seller.feedback_count <= HARD_FILTER_BAD_RATIO_MAX_COUNT:
# Moderate-volume account with consistently bad ratio → hard flag. # Moderate-volume account with consistently bad ratio → hard flag.
red_flags.append("established_bad_actor") red_flags.append("established_bad_actor")

View file

@ -44,7 +44,13 @@ class MetadataScorer:
if count < 200: return 15 if count < 200: return 15
return 20 return 20
def _feedback_ratio(self, ratio: float, count: int) -> int: def _feedback_ratio(self, ratio: float, count: int) -> Optional[int]:
# ratio=0.0 with count>0 means the 12-month percentage wasn't on the page —
# eBay omits the ratio for returning/buyer-only sellers with no recent sales.
# Treat as missing rather than "literally 0% positive" (which eBay doesn't allow
# on active accounts — those get suspended long before reaching 0%).
if ratio == 0.0 and count > 0:
return None
if ratio < 0.80 and count > 20: return 0 if ratio < 0.80 and count > 20: return 0
if ratio < 0.90: return 5 if ratio < 0.90: return 5
if ratio < 0.95: return 10 if ratio < 0.95: return 10

View file

@ -20,9 +20,12 @@ services:
CLOUD_MODE: "true" CLOUD_MODE: "true"
CLOUD_DATA_ROOT: /devl/snipe-cloud-data CLOUD_DATA_ROOT: /devl/snipe-cloud-data
# DIRECTUS_JWT_SECRET, HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN — set in .env (never commit) # DIRECTUS_JWT_SECRET, HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN — set in .env (never commit)
# CF_ORCH_URL routes LLM query builder through cf-orch for VRAM-aware scheduling. # GPU_SERVER_URL routes LLM query builder through cf-orch for VRAM-aware scheduling.
# Override in .env to use a different coordinator URL. # Override in .env to use a different coordinator URL.
CF_ORCH_URL: "http://host.docker.internal:7700" GPU_SERVER_URL: "http://host.docker.internal:7700"
# SNIPE_SHARED_DB_URL — Postgres DSN for shared tables (sellers, market_comps, blocklist).
# Required for production multi-user deployments. Set in .env (never commit).
# SNIPE_SHARED_DB_URL: "postgresql://snipe:<password>@postgres:5432/snipe_shared"
CF_APP_NAME: snipe CF_APP_NAME: snipe
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"

View file

@ -18,8 +18,8 @@ services:
environment: environment:
- RELOAD=true - RELOAD=true
# Point the LLM/vision task scheduler at the local cf-orch coordinator. # Point the LLM/vision task scheduler at the local cf-orch coordinator.
# Only has effect when CF_ORCH_URL is set (uncomment in .env, or set inline). # Only has effect when GPU_SERVER_URL is set (uncomment in .env, or set inline).
# - CF_ORCH_URL=http://10.1.10.71:7700 # - GPU_SERVER_URL=http://10.1.10.71:7700
# cf-orch agent — routes trust_photo_analysis vision tasks to the GPU coordinator. # cf-orch agent — routes trust_photo_analysis vision tasks to the GPU coordinator.
# Only starts when you pass --profile orch: # Only starts when you pass --profile orch:

View file

@ -6,7 +6,7 @@
# (claude_code, copilot) are intentionally excluded here. # (claude_code, copilot) are intentionally excluded here.
# #
# CF Orchestrator routes both ollama and vllm allocations for VRAM-aware # CF Orchestrator routes both ollama and vllm allocations for VRAM-aware
# scheduling. CF_ORCH_URL must be set in .env for allocations to resolve; # scheduling. GPU_SERVER_URL must be set in .env for allocations to resolve;
# if cf-orch is unreachable the backend falls back to its static base_url. # if cf-orch is unreachable the backend falls back to its static base_url.
# #
# Model choice for query builder: llama3.1:8b # Model choice for query builder: llama3.1:8b

Binary file not shown.

Before

Width:  |  Height:  |  Size: 118 KiB

After

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

View file

@ -8,7 +8,7 @@ version = "0.3.0"
description = "Auction listing monitor and trust scorer" description = "Auction listing monitor and trust scorer"
requires-python = ">=3.11" requires-python = ">=3.11"
dependencies = [ dependencies = [
"circuitforge-core>=0.8.0", "circuitforge-core[community]>=0.8.0",
"streamlit>=1.32", "streamlit>=1.32",
"requests>=2.31", "requests>=2.31",
"imagehash>=4.3", "imagehash>=4.3",
@ -23,14 +23,20 @@ dependencies = [
"playwright-stealth>=1.0", "playwright-stealth>=1.0",
"cryptography>=42.0", "cryptography>=42.0",
"PyJWT>=2.8", "PyJWT>=2.8",
"httpx>=0.27",
] ]
[project.optional-dependencies] [project.optional-dependencies]
orchestration = [
# Paid+ tier only — not published to PyPI. Install from source or Forgejo Packages.
# pip install -e ../circuitforge-orch (dev)
# pip install snipe[orchestration] (self-hosted Paid+)
"circuitforge-orch>=0.1.0",
]
dev = [ dev = [
"pytest>=8.0", "pytest>=8.0",
"pytest-cov>=5.0", "pytest-cov>=5.0",
"ruff>=0.4", "ruff>=0.4",
"httpx>=0.27", # FastAPI test client
] ]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]

17
tests/conftest.py Normal file
View file

@ -0,0 +1,17 @@
import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers",
"postgres: mark test as requiring a live Postgres instance (SNIPE_SHARED_DB_URL must be set)",
)
@pytest.fixture
def postgres_dsn():
dsn = os.environ.get("SNIPE_SHARED_DB_URL")
if not dsn:
pytest.skip("SNIPE_SHARED_DB_URL not set — skipping Postgres tests")
return dsn

157
tests/db/test_pg_shared.py Normal file
View file

@ -0,0 +1,157 @@
"""Tests for SnipeSharedStore — requires live Postgres via SNIPE_SHARED_DB_URL."""
import pytest
from app.db.models import MarketComp, Seller
from app.db.pg_shared import SnipeSharedDB, SnipeSharedStore
from app.db.protocol import SharedTableProtocol
@pytest.mark.postgres
def test_snipe_shared_store_satisfies_protocol(postgres_dsn):
assert issubclass(SnipeSharedStore, SharedTableProtocol)
@pytest.mark.postgres
def test_save_and_get_seller(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
seller = Seller(
platform="ebay",
platform_seller_id="test-seller-001",
username="testseller",
account_age_days=365,
feedback_count=100,
feedback_ratio=0.99,
category_history_json='{"electronics": 5}',
)
store.save_seller(seller)
result = store.get_seller("ebay", "test-seller-001")
assert result is not None
assert result.username == "testseller"
assert result.feedback_count == 100
store.delete_seller_data("ebay", "test-seller-001")
db.close()
@pytest.mark.postgres
def test_save_sellers_coalesce_preserves_age(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
seller_with_age = Seller(
platform="ebay", platform_seller_id="coalesce-test",
username="u", account_age_days=730,
feedback_count=50, feedback_ratio=0.95, category_history_json="{}",
)
store.save_seller(seller_with_age)
seller_without_age = Seller(
platform="ebay", platform_seller_id="coalesce-test",
username="u", account_age_days=None,
feedback_count=60, feedback_ratio=0.96, category_history_json="{}",
)
store.save_sellers([seller_without_age])
result = store.get_seller("ebay", "coalesce-test")
assert result.account_age_days == 730
assert result.feedback_count == 60
store.delete_seller_data("ebay", "coalesce-test")
db.close()
@pytest.mark.postgres
def test_market_comp_cache(postgres_dsn):
from datetime import datetime, timedelta, timezone
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
expires = (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat()
comp = MarketComp(
platform="ebay", query_hash="abc123",
median_price=49.99, sample_count=10, expires_at=expires,
)
store.save_market_comp(comp)
result = store.get_market_comp("ebay", "abc123")
assert result is not None
assert result.median_price == 49.99
db.close()
@pytest.mark.postgres
def test_reported_sellers(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.mark_reported("ebay", "bad-seller-99", username="badguy")
reported = store.list_reported("ebay")
assert "bad-seller-99" in reported
store.mark_reported("ebay", "bad-seller-99") # idempotent
db.close()
@pytest.mark.postgres
def test_clone_returns_self(postgres_dsn):
db = SnipeSharedDB(postgres_dsn)
store = SnipeSharedStore(db)
assert store.clone() is store
db.close()
@pytest.mark.postgres
def test_blocklist_add_get_remove(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
assert not store.is_blocklisted("ebay", "bad-999")
entry = store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="bad-999",
username="scammer1", reason="sold fakes", source="manual",
))
assert entry.id is not None
assert store.is_blocklisted("ebay", "bad-999")
entries = store.list_blocklist("ebay")
assert any(e.platform_seller_id == "bad-999" for e in entries)
store.remove_from_blocklist("ebay", "bad-999")
assert not store.is_blocklisted("ebay", "bad-999")
db.close()
@pytest.mark.postgres
def test_blocklist_upsert_is_idempotent(postgres_dsn):
from app.db.models import ScammerEntry
db = SnipeSharedDB(postgres_dsn)
db.run_migrations()
store = SnipeSharedStore(db)
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller", reason="reason1", source="manual",
))
# Second add — should not raise, should update username but preserve reason via COALESCE
store.add_to_blocklist(ScammerEntry(
platform="ebay", platform_seller_id="dup-test",
username="seller_updated", reason=None, source="community",
))
entries = [e for e in store.list_blocklist("ebay") if e.platform_seller_id == "dup-test"]
assert len(entries) == 1
assert entries[0].username == "seller_updated"
assert entries[0].reason == "reason1" # COALESCE preserved original reason
store.remove_from_blocklist("ebay", "dup-test")
db.close()

39
tests/db/test_protocol.py Normal file
View file

@ -0,0 +1,39 @@
"""Verify Store satisfies SharedTableProtocol at import time."""
from app.db.protocol import SharedTableProtocol
from app.db.store import Store
def test_store_satisfies_protocol():
assert issubclass(Store, SharedTableProtocol)
def test_store_clone_returns_new_instance(tmp_path):
db = tmp_path / "test.db"
s = Store(db)
clone = s.clone()
assert isinstance(clone, Store)
assert clone is not s
assert clone._db_path == db
def test_ebay_adapter_accepts_protocol():
from app.platforms.ebay.adapter import EbayAdapter
import tempfile
import pathlib
from unittest.mock import MagicMock
with tempfile.TemporaryDirectory() as tmp:
s = Store(pathlib.Path(tmp) / "t.db")
adapter = EbayAdapter(token_manager=MagicMock(), shared_store=s)
assert adapter._store is s
def test_scraped_adapter_no_db_path_ref():
from app.platforms.ebay.scraper import ScrapedEbayAdapter
import tempfile
import pathlib
with tempfile.TemporaryDirectory() as tmp:
s = Store(pathlib.Path(tmp) / "t.db")
adapter = ScrapedEbayAdapter(shared_store=s)
assert not hasattr(adapter, '_db_path_ref')

View file

@ -1,16 +1,15 @@
"""Tests for app.platforms.ebay.browser_pool. """Tests for app.platforms.ebay.browser_pool (thread-local design).
All tests run without real Chromium / Xvfb / Playwright. All tests run without real Chromium / Xvfb / Playwright.
Playwright, Xvfb subprocess calls, and Stealth are mocked throughout. Playwright, Xvfb subprocess calls, and Stealth are mocked throughout.
""" """
from __future__ import annotations from __future__ import annotations
import queue
import subprocess import subprocess
import threading import threading
import time import time
from typing import Any from typing import Any
from unittest.mock import MagicMock, patch, call from unittest.mock import MagicMock, patch
import pytest import pytest
@ -19,40 +18,35 @@ import pytest
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _reset_pool_singleton(): def _reset_pool_singleton():
"""Force the module-level _pool singleton back to None."""
import app.platforms.ebay.browser_pool as _mod import app.platforms.ebay.browser_pool as _mod
_mod._pool = None _mod._pool = None
# --------------------------------------------------------------------------- def _reset_thread_local():
# Fixtures import app.platforms.ebay.browser_pool as _mod
# --------------------------------------------------------------------------- _mod._thread_local.slot = None
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def reset_singleton(): def reset_pool():
"""Reset the singleton before and after every test."""
_reset_pool_singleton() _reset_pool_singleton()
_reset_thread_local()
yield yield
_reset_pool_singleton() _reset_pool_singleton()
_reset_thread_local()
def _make_fake_slot(): def _make_fake_slot():
"""Build a mock _PooledBrowser with all necessary attributes."""
from app.platforms.ebay.browser_pool import _PooledBrowser from app.platforms.ebay.browser_pool import _PooledBrowser
xvfb = MagicMock(spec=subprocess.Popen) xvfb = MagicMock(spec=subprocess.Popen)
pw = MagicMock() pw = MagicMock()
browser = MagicMock() browser = MagicMock()
ctx = MagicMock() ctx = MagicMock()
slot = _PooledBrowser( return _PooledBrowser(
xvfb=xvfb, xvfb=xvfb, pw=pw, browser=browser, ctx=ctx,
pw=pw, display_num=100, last_used_ts=time.time(),
browser=browser,
ctx=ctx,
display_num=100,
last_used_ts=time.time(),
) )
return slot
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -62,9 +56,7 @@ def _make_fake_slot():
class TestGetPoolSingleton: class TestGetPoolSingleton:
def test_returns_same_instance(self): def test_returns_same_instance(self):
from app.platforms.ebay.browser_pool import get_pool, BrowserPool from app.platforms.ebay.browser_pool import get_pool, BrowserPool
p1 = get_pool() assert get_pool() is get_pool()
p2 = get_pool()
assert p1 is p2
def test_returns_browser_pool_instance(self): def test_returns_browser_pool_instance(self):
from app.platforms.ebay.browser_pool import get_pool, BrowserPool from app.platforms.ebay.browser_pool import get_pool, BrowserPool
@ -72,14 +64,12 @@ class TestGetPoolSingleton:
def test_default_size_is_two(self): def test_default_size_is_two(self):
from app.platforms.ebay.browser_pool import get_pool from app.platforms.ebay.browser_pool import get_pool
pool = get_pool() assert get_pool()._size == 2
assert pool._size == 2
def test_custom_size_from_env(self, monkeypatch): def test_custom_size_from_env(self, monkeypatch):
monkeypatch.setenv("BROWSER_POOL_SIZE", "5") monkeypatch.setenv("BROWSER_POOL_SIZE", "5")
from app.platforms.ebay.browser_pool import get_pool from app.platforms.ebay.browser_pool import get_pool
pool = get_pool() assert get_pool()._size == 5
assert pool._size == 5
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -88,17 +78,15 @@ class TestGetPoolSingleton:
class TestLifecycle: class TestLifecycle:
def test_start_is_noop_when_playwright_unavailable(self): def test_start_is_noop_when_playwright_unavailable(self):
"""Pool should handle missing Playwright gracefully — no error raised."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2) pool = BrowserPool(size=2)
with patch.object(pool, "_check_playwright", return_value=False): with patch.object(pool, "_check_playwright", return_value=False):
pool.start() # must not raise pool.start()
# Pool queue is empty — no slots launched. assert pool._started is True
assert pool._q.empty() assert pool._slot_registry == {}
def test_start_only_runs_once(self): def test_start_only_runs_once(self):
"""Calling start() twice must not double-warm."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
@ -107,47 +95,46 @@ class TestLifecycle:
pool.start() pool.start()
assert pool._started is True assert pool._started is True
def test_stop_drains_queue(self): def test_stop_closes_all_registry_slots(self):
"""stop() should close every slot in the queue."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2) pool = BrowserPool(size=2)
slot1 = _make_fake_slot() slot1 = _make_fake_slot()
slot2 = _make_fake_slot() slot2 = _make_fake_slot()
pool._q.put(slot1) pool._slot_registry[1001] = slot1
pool._q.put(slot2) pool._slot_registry[1002] = slot2
with patch("app.platforms.ebay.browser_pool._close_slot") as mock_close: with patch("app.platforms.ebay.browser_pool._close_slot") as mock_close:
pool.stop() pool.stop()
assert mock_close.call_count == 2 assert mock_close.call_count == 2
assert pool._q.empty() assert pool._slot_registry == {}
assert pool._stopped is True assert pool._stopped is True
def test_stop_on_empty_pool_is_safe(self): def test_stop_on_empty_registry_is_safe(self):
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2) BrowserPool(size=2).stop()
pool.stop() # must not raise
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# fetch_html — pool hit path # fetch_html — thread-local slot hit path
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestFetchHtmlPoolHit: class TestFetchHtmlSlotHit:
def test_uses_pooled_slot_and_replenishes(self): def test_uses_existing_slot_and_replenishes(self):
"""fetch_html should borrow a slot, call _fetch_with_slot, replenish."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
import app.platforms.ebay.browser_pool as _mod
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
slot = _make_fake_slot() slot = _make_fake_slot()
pool._q.put(slot) _mod._thread_local.slot = slot
fresh_slot = _make_fake_slot() fresh_slot = _make_fake_slot()
with ( with (
patch.object(pool, "_fetch_with_slot", return_value="<html>ok</html>") as mock_fetch, patch.object(pool, "_fetch_with_slot", return_value="<html>ok</html>") as mock_fetch,
patch("app.platforms.ebay.browser_pool._replenish_slot", return_value=fresh_slot) as mock_replenish, patch("app.platforms.ebay.browser_pool._replenish_slot", return_value=fresh_slot),
patch.object(pool, "_register_slot") as mock_register,
patch("time.sleep"), patch("time.sleep"),
): ):
html = pool.fetch_html("https://www.ebay.com/sch/i.html?_nkw=test", delay=0) html = pool.fetch_html("https://www.ebay.com/sch/i.html?_nkw=test", delay=0)
@ -157,21 +144,19 @@ class TestFetchHtmlPoolHit:
slot, "https://www.ebay.com/sch/i.html?_nkw=test", slot, "https://www.ebay.com/sch/i.html?_nkw=test",
wait_for_selector=None, wait_for_timeout_ms=2000, wait_for_selector=None, wait_for_timeout_ms=2000,
) )
mock_replenish.assert_called_once_with(slot) mock_register.assert_called_once_with(fresh_slot)
# Fresh slot returned to queue
assert pool._q.get_nowait() is fresh_slot
def test_delay_is_respected(self): def test_delay_is_respected(self):
"""fetch_html must call time.sleep(delay)."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
import app.platforms.ebay.browser_pool as _mod
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
slot = _make_fake_slot() _mod._thread_local.slot = _make_fake_slot()
pool._q.put(slot)
with ( with (
patch.object(pool, "_fetch_with_slot", return_value="<html/>"), patch.object(pool, "_fetch_with_slot", return_value="<html/>"),
patch("app.platforms.ebay.browser_pool._replenish_slot", return_value=_make_fake_slot()), patch("app.platforms.ebay.browser_pool._replenish_slot", return_value=_make_fake_slot()),
patch.object(pool, "_register_slot"),
patch("app.platforms.ebay.browser_pool.time") as mock_time, patch("app.platforms.ebay.browser_pool.time") as mock_time,
): ):
pool.fetch_html("https://example.com", delay=1.5) pool.fetch_html("https://example.com", delay=1.5)
@ -180,22 +165,19 @@ class TestFetchHtmlPoolHit:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# fetch_html — pool empty / fallback path # fetch_html — no slot / fallback path
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestFetchHtmlFallback: class TestFetchHtmlFallback:
def test_falls_back_to_fresh_browser_when_pool_empty(self): def test_falls_back_when_no_slot_and_playwright_unavailable(self):
"""When pool is empty after timeout, _fetch_fresh should be called."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
# Queue is empty — no slots available. # No thread-local slot; playwright unavailable → _get_or_create returns None.
with ( with (
patch.object(pool, "_get_or_create_thread_slot", return_value=None),
patch.object(pool, "_fetch_fresh", return_value="<html>fresh</html>") as mock_fresh, patch.object(pool, "_fetch_fresh", return_value="<html>fresh</html>") as mock_fresh,
patch("time.sleep"), patch("time.sleep"),
# Make Queue.get raise Empty after a short wait.
patch.object(pool._q, "get", side_effect=queue.Empty),
): ):
html = pool.fetch_html("https://www.ebay.com/sch/i.html?_nkw=widget", delay=0) html = pool.fetch_html("https://www.ebay.com/sch/i.html?_nkw=widget", delay=0)
@ -206,17 +188,18 @@ class TestFetchHtmlFallback:
) )
def test_falls_back_when_pooled_fetch_raises(self): def test_falls_back_when_pooled_fetch_raises(self):
"""If _fetch_with_slot raises, the slot is closed and _fetch_fresh is used."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
import app.platforms.ebay.browser_pool as _mod
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
slot = _make_fake_slot() slot = _make_fake_slot()
pool._q.put(slot) _mod._thread_local.slot = slot
with ( with (
patch.object(pool, "_fetch_with_slot", side_effect=RuntimeError("Chromium crashed")), patch.object(pool, "_fetch_with_slot", side_effect=RuntimeError("Chromium crashed")),
patch.object(pool, "_fetch_fresh", return_value="<html>recovered</html>") as mock_fresh, patch.object(pool, "_fetch_fresh", return_value="<html>recovered</html>") as mock_fresh,
patch("app.platforms.ebay.browser_pool._close_slot") as mock_close, patch("app.platforms.ebay.browser_pool._close_slot") as mock_close,
patch.object(pool, "_unregister_slot"),
patch("time.sleep"), patch("time.sleep"),
): ):
html = pool.fetch_html("https://www.ebay.com/", delay=0) html = pool.fetch_html("https://www.ebay.com/", delay=0)
@ -226,19 +209,107 @@ class TestFetchHtmlFallback:
mock_fresh.assert_called_once() mock_fresh.assert_called_once()
# ---------------------------------------------------------------------------
# Thread-local slot management
# ---------------------------------------------------------------------------
class TestThreadLocalSlotManagement:
def test_get_or_create_returns_existing_slot(self):
import app.platforms.ebay.browser_pool as _mod
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
pool._playwright_available = True
existing = _make_fake_slot()
_mod._thread_local.slot = existing
result = pool._get_or_create_thread_slot()
assert result is existing
def test_get_or_create_launches_new_slot_when_absent(self):
import app.platforms.ebay.browser_pool as _mod
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
pool._playwright_available = True
_mod._thread_local.slot = None
new_slot = _make_fake_slot()
with (
patch("app.platforms.ebay.browser_pool._launch_slot", return_value=new_slot),
patch.object(pool, "_register_slot") as mock_register,
):
result = pool._get_or_create_thread_slot()
assert result is new_slot
mock_register.assert_called_once_with(new_slot)
def test_get_or_create_returns_none_when_playwright_unavailable(self):
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
pool._playwright_available = False
assert pool._get_or_create_thread_slot() is None
def test_register_slot_sets_thread_local_and_registry(self):
import app.platforms.ebay.browser_pool as _mod
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
slot = _make_fake_slot()
pool._register_slot(slot)
assert _mod._thread_local.slot is slot
assert threading.get_ident() in pool._slot_registry
def test_unregister_slot_clears_thread_local_and_registry(self):
import app.platforms.ebay.browser_pool as _mod
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1)
slot = _make_fake_slot()
pool._register_slot(slot)
pool._unregister_slot()
assert getattr(_mod._thread_local, "slot", None) is None
assert threading.get_ident() not in pool._slot_registry
def test_different_threads_get_independent_slots(self):
from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2)
pool._playwright_available = True
slots_seen: list = []
errors: list = []
def worker():
new_slot = _make_fake_slot()
with patch("app.platforms.ebay.browser_pool._launch_slot", return_value=new_slot):
s = pool._get_or_create_thread_slot()
slots_seen.append(s)
t1 = threading.Thread(target=worker)
t2 = threading.Thread(target=worker)
t1.start(); t2.start()
t1.join(); t2.join()
assert len(slots_seen) == 2
# Each thread got its own slot object (they may differ or coincidentally share
# the same mock; what matters is both threads succeeded without interference).
assert all(s is not None for s in slots_seen)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# ImportError graceful fallback # ImportError graceful fallback
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestImportErrorHandling: class TestImportErrorHandling:
def test_check_playwright_returns_false_on_import_error(self): def test_check_playwright_returns_false_on_import_error(self):
"""_check_playwright should cache False when playwright is not installed."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=2) pool = BrowserPool(size=2)
with patch.dict("sys.modules", {"playwright": None, "playwright_stealth": None}): with patch.dict("sys.modules", {"playwright": None, "playwright_stealth": None}):
# Force re-check by clearing the cached value.
pool._playwright_available = None pool._playwright_available = None
result = pool._check_playwright() result = pool._check_playwright()
@ -246,12 +317,11 @@ class TestImportErrorHandling:
assert pool._playwright_available is False assert pool._playwright_available is False
def test_start_logs_warning_when_playwright_missing(self, caplog): def test_start_logs_warning_when_playwright_missing(self, caplog):
"""start() should log a warning and not crash when Playwright is absent."""
import logging import logging
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
pool._playwright_available = False # simulate missing pool._playwright_available = False
with patch.object(pool, "_check_playwright", return_value=False): with patch.object(pool, "_check_playwright", return_value=False):
with caplog.at_level(logging.WARNING, logger="app.platforms.ebay.browser_pool"): with caplog.at_level(logging.WARNING, logger="app.platforms.ebay.browser_pool"):
@ -260,87 +330,14 @@ class TestImportErrorHandling:
assert any("not available" in r.message for r in caplog.records) assert any("not available" in r.message for r in caplog.records)
def test_fetch_fresh_raises_runtime_error_when_playwright_missing(self): def test_fetch_fresh_raises_runtime_error_when_playwright_missing(self):
"""_fetch_fresh must raise RuntimeError (not ImportError) when PW absent."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
pool = BrowserPool(size=1) pool = BrowserPool(size=1)
with patch.dict("sys.modules", {"playwright": None, "playwright.sync_api": None}): with patch.dict("sys.modules", {"playwright": None, "playwright.sync_api": None}):
with pytest.raises(RuntimeError, match="Playwright not installed"): with pytest.raises(RuntimeError, match="Playwright not installed"):
pool._fetch_fresh("https://www.ebay.com/") pool._fetch_fresh("https://www.ebay.com/")
# ---------------------------------------------------------------------------
# Idle cleanup
# ---------------------------------------------------------------------------
class TestIdleCleanup:
def test_idle_cleanup_closes_stale_slots(self):
"""_idle_cleanup_loop should close slots whose last_used_ts is too old."""
from app.platforms.ebay.browser_pool import BrowserPool, _IDLE_TIMEOUT_SECS
pool = BrowserPool(size=2)
stale_slot = _make_fake_slot()
stale_slot.last_used_ts = time.time() - (_IDLE_TIMEOUT_SECS + 60)
fresh_slot = _make_fake_slot()
fresh_slot.last_used_ts = time.time()
pool._q.put(stale_slot)
pool._q.put(fresh_slot)
closed_slots = []
def fake_close(s):
closed_slots.append(s)
with patch("app.platforms.ebay.browser_pool._close_slot", side_effect=fake_close):
# Run one cleanup tick directly (not the full loop).
now = time.time()
idle_cutoff = now - _IDLE_TIMEOUT_SECS
kept = []
while True:
try:
s = pool._q.get_nowait()
except queue.Empty:
break
if s.last_used_ts < idle_cutoff:
fake_close(s)
else:
kept.append(s)
for s in kept:
pool._q.put(s)
assert stale_slot in closed_slots
assert fresh_slot not in closed_slots
assert pool._q.qsize() == 1
def test_idle_cleanup_loop_stops_when_pool_stopped(self):
"""Cleanup daemon should exit when _stopped is True."""
from app.platforms.ebay.browser_pool import BrowserPool, _CLEANUP_INTERVAL_SECS
pool = BrowserPool(size=1)
pool._stopped = True
# The loop should return after one iteration of the while check.
# Use a very short sleep mock so the test doesn't actually wait 60s.
sleep_calls = []
def fake_sleep(secs):
sleep_calls.append(secs)
with patch("app.platforms.ebay.browser_pool.time") as mock_time:
mock_time.time.return_value = time.time()
mock_time.sleep.side_effect = fake_sleep
# Run in a thread with a short timeout to confirm it exits.
t = threading.Thread(target=pool._idle_cleanup_loop)
t.start()
t.join(timeout=2.0)
assert not t.is_alive(), "idle cleanup loop did not exit when _stopped=True"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _replenish_slot helper # _replenish_slot helper
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -355,12 +352,8 @@ class TestReplenishSlot:
browser.new_context.return_value = new_ctx browser.new_context.return_value = new_ctx
slot = _PooledBrowser( slot = _PooledBrowser(
xvfb=MagicMock(), xvfb=MagicMock(), pw=MagicMock(), browser=browser,
pw=MagicMock(), ctx=old_ctx, display_num=101, last_used_ts=time.time() - 10,
browser=browser,
ctx=old_ctx,
display_num=101,
last_used_ts=time.time() - 10,
) )
result = _replenish_slot(slot) result = _replenish_slot(slot)
@ -370,7 +363,6 @@ class TestReplenishSlot:
assert result.ctx is new_ctx assert result.ctx is new_ctx
assert result.browser is browser assert result.browser is browser
assert result.xvfb is slot.xvfb assert result.xvfb is slot.xvfb
# last_used_ts is refreshed
assert result.last_used_ts > slot.last_used_ts assert result.last_used_ts > slot.last_used_ts
@ -391,7 +383,6 @@ class TestCloseSlot:
xvfb=xvfb, pw=pw, browser=browser, ctx=ctx, xvfb=xvfb, pw=pw, browser=browser, ctx=ctx,
display_num=102, last_used_ts=time.time(), display_num=102, last_used_ts=time.time(),
) )
_close_slot(slot) _close_slot(slot)
ctx.close.assert_called_once() ctx.close.assert_called_once()
@ -401,7 +392,6 @@ class TestCloseSlot:
xvfb.wait.assert_called_once() xvfb.wait.assert_called_once()
def test_close_slot_ignores_exceptions(self): def test_close_slot_ignores_exceptions(self):
"""_close_slot must not raise even if components throw."""
from app.platforms.ebay.browser_pool import _close_slot, _PooledBrowser from app.platforms.ebay.browser_pool import _close_slot, _PooledBrowser
xvfb = MagicMock(spec=subprocess.Popen) xvfb = MagicMock(spec=subprocess.Popen)
@ -418,7 +408,6 @@ class TestCloseSlot:
xvfb=xvfb, pw=pw, browser=browser, ctx=ctx, xvfb=xvfb, pw=pw, browser=browser, ctx=ctx,
display_num=103, last_used_ts=time.time(), display_num=103, last_used_ts=time.time(),
) )
_close_slot(slot) # must not raise _close_slot(slot) # must not raise
@ -428,7 +417,6 @@ class TestCloseSlot:
class TestScraperUsesPool: class TestScraperUsesPool:
def test_fetch_url_delegates_to_pool(self): def test_fetch_url_delegates_to_pool(self):
"""ScrapedEbayAdapter._fetch_url must use the pool, not launch its own browser."""
from app.platforms.ebay.browser_pool import BrowserPool from app.platforms.ebay.browser_pool import BrowserPool
from app.platforms.ebay.scraper import ScrapedEbayAdapter from app.platforms.ebay.scraper import ScrapedEbayAdapter
from app.db.store import Store from app.db.store import Store
@ -440,7 +428,6 @@ class TestScraperUsesPool:
fake_pool.fetch_html.return_value = "<html>pooled</html>" fake_pool.fetch_html.return_value = "<html>pooled</html>"
with patch("app.platforms.ebay.browser_pool.get_pool", return_value=fake_pool): with patch("app.platforms.ebay.browser_pool.get_pool", return_value=fake_pool):
# Clear the cache so fetch_url actually hits the pool.
import app.platforms.ebay.scraper as scraper_mod import app.platforms.ebay.scraper as scraper_mod
scraper_mod._html_cache.clear() scraper_mod._html_cache.clear()
html = adapter._fetch_url("https://www.ebay.com/sch/i.html?_nkw=test") html = adapter._fetch_url("https://www.ebay.com/sch/i.html?_nkw=test")
@ -451,7 +438,6 @@ class TestScraperUsesPool:
) )
def test_fetch_url_uses_cache_before_pool(self): def test_fetch_url_uses_cache_before_pool(self):
"""_fetch_url should return cached HTML without hitting the pool."""
from app.platforms.ebay.scraper import ScrapedEbayAdapter, _html_cache, _HTML_CACHE_TTL from app.platforms.ebay.scraper import ScrapedEbayAdapter, _html_cache, _HTML_CACHE_TTL
from app.db.store import Store from app.db.store import Store
@ -467,6 +453,4 @@ class TestScraperUsesPool:
assert html == "<html>cached</html>" assert html == "<html>cached</html>"
fake_pool.fetch_html.assert_not_called() fake_pool.fetch_html.assert_not_called()
# Cleanup
_html_cache.pop(url, None) _html_cache.pop(url, None)

View file

@ -1,4 +1,4 @@
"""Unit tests for QueryTranslator — LLMRouter mocked at boundary.""" """Unit tests for QueryTranslator — LLMRouter and cf-orch backends mocked at boundary."""
from __future__ import annotations from __future__ import annotations
import json import json
@ -73,7 +73,7 @@ def test_parse_response_missing_required_field():
_parse_response(raw) _parse_response(raw)
# ── QueryTranslator (integration with mocked LLMRouter) ────────────────────── # ── Fixtures ──────────────────────────────────────────────────────────────────
from app.platforms.ebay.categories import EbayCategoryCache from app.platforms.ebay.categories import EbayCategoryCache
from circuitforge_core.db import get_connection, run_migrations from circuitforge_core.db import get_connection, run_migrations
@ -88,16 +88,7 @@ def db_with_categories(tmp_path):
return conn return conn
def _make_translator(db_conn, llm_response: str) -> QueryTranslator: _VALID_LLM_RESPONSE = json.dumps({
from app.platforms.ebay.categories import EbayCategoryCache
cache = EbayCategoryCache(db_conn)
mock_router = MagicMock()
mock_router.complete.return_value = llm_response
return QueryTranslator(category_cache=cache, llm_router=mock_router)
def test_translate_returns_search_params(db_with_categories):
llm_out = json.dumps({
"base_query": "RTX 3080", "base_query": "RTX 3080",
"must_include_mode": "groups", "must_include_mode": "groups",
"must_include": "rtx|geforce, 3080", "must_include": "rtx|geforce, 3080",
@ -107,8 +98,21 @@ def test_translate_returns_search_params(db_with_categories):
"condition": ["used"], "condition": ["used"],
"category_id": "27386", "category_id": "27386",
"explanation": "Searching for used RTX 3080 GPUs under $300.", "explanation": "Searching for used RTX 3080 GPUs under $300.",
}) })
t = _make_translator(db_with_categories, llm_out)
# ── Local LLMRouter backend ───────────────────────────────────────────────────
def _make_local_translator(db_conn, llm_response: str) -> QueryTranslator:
from app.platforms.ebay.categories import EbayCategoryCache
cache = EbayCategoryCache(db_conn)
mock_router = MagicMock()
mock_router.complete.return_value = llm_response
return QueryTranslator(category_cache=cache, llm_router=mock_router)
def test_translate_returns_search_params(db_with_categories):
t = _make_local_translator(db_with_categories, _VALID_LLM_RESPONSE)
result = t.translate("used RTX 3080 under $300 no mining") result = t.translate("used RTX 3080 under $300 no mining")
assert result.base_query == "RTX 3080" assert result.base_query == "RTX 3080"
assert result.max_price == 300.0 assert result.max_price == 300.0
@ -116,18 +120,7 @@ def test_translate_returns_search_params(db_with_categories):
def test_translate_injects_category_hints(db_with_categories): def test_translate_injects_category_hints(db_with_categories):
"""The system prompt sent to the LLM must contain category_id hints.""" """The system prompt sent to the LLM must contain category_id hints."""
llm_out = json.dumps({ t = _make_local_translator(db_with_categories, _VALID_LLM_RESPONSE)
"base_query": "GPU",
"must_include_mode": "all",
"must_include": "",
"must_exclude": "",
"max_price": None,
"min_price": None,
"condition": [],
"category_id": None,
"explanation": "Searching for GPUs.",
})
t = _make_translator(db_with_categories, llm_out)
t.translate("GPU") t.translate("GPU")
call_args = t._llm_router.complete.call_args call_args = t._llm_router.complete.call_args
system_prompt = call_args.kwargs.get("system") or call_args.args[1] system_prompt = call_args.kwargs.get("system") or call_args.args[1]
@ -141,7 +134,7 @@ def test_translate_empty_category_cache_still_works(tmp_path):
conn = get_connection(tmp_path / "empty.db") conn = get_connection(tmp_path / "empty.db")
run_migrations(conn, Path("app/db/migrations")) run_migrations(conn, Path("app/db/migrations"))
# Do NOT seed bootstrap — empty cache # Do NOT seed bootstrap — empty cache
llm_out = json.dumps({ t = _make_local_translator(conn, json.dumps({
"base_query": "vinyl", "base_query": "vinyl",
"must_include_mode": "all", "must_include_mode": "all",
"must_include": "", "must_include": "",
@ -151,8 +144,7 @@ def test_translate_empty_category_cache_still_works(tmp_path):
"condition": [], "condition": [],
"category_id": None, "category_id": None,
"explanation": "Searching for vinyl records.", "explanation": "Searching for vinyl records.",
}) }))
t = _make_translator(conn, llm_out)
result = t.translate("vinyl records") result = t.translate("vinyl records")
assert result.base_query == "vinyl" assert result.base_query == "vinyl"
call_args = t._llm_router.complete.call_args call_args = t._llm_router.complete.call_args
@ -168,3 +160,101 @@ def test_translate_llm_error_raises_query_translator_error(db_with_categories):
t = QueryTranslator(category_cache=cache, llm_router=mock_router) t = QueryTranslator(category_cache=cache, llm_router=mock_router)
with pytest.raises(QueryTranslatorError, match="LLM backend"): with pytest.raises(QueryTranslatorError, match="LLM backend"):
t.translate("used GPU") t.translate("used GPU")
# ── cf-orch backend ───────────────────────────────────────────────────────────
def _make_orch_translator(db_conn) -> QueryTranslator:
from app.platforms.ebay.categories import EbayCategoryCache
cache = EbayCategoryCache(db_conn)
return QueryTranslator(category_cache=cache, cforch_url="http://orch.local:8700")
def _mock_alloc_response() -> MagicMock:
resp = MagicMock()
resp.json.return_value = {
"url": "http://cf-text.local:11434",
"allocation_id": "alloc-abc123",
"node_id": "heimdall",
}
resp.raise_for_status.return_value = None
return resp
def _mock_chat_response(content: str) -> MagicMock:
resp = MagicMock()
resp.json.return_value = {
"choices": [{"message": {"content": content}}]
}
resp.raise_for_status.return_value = None
return resp
def _mock_delete_response() -> MagicMock:
resp = MagicMock()
resp.raise_for_status.return_value = None
return resp
def test_orch_translate_returns_search_params(db_with_categories):
t = _make_orch_translator(db_with_categories)
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
mock_post.side_effect = [
_mock_alloc_response(),
_mock_chat_response(_VALID_LLM_RESPONSE),
]
mock_delete.return_value = _mock_delete_response()
result = t.translate("used RTX 3080 under $300")
assert result.base_query == "RTX 3080"
assert result.max_price == 300.0
def test_orch_allocates_with_correct_task_tag(db_with_categories):
t = _make_orch_translator(db_with_categories)
with patch("httpx.post") as mock_post, patch("httpx.delete"):
mock_post.side_effect = [
_mock_alloc_response(),
_mock_chat_response(_VALID_LLM_RESPONSE),
]
t.translate("GPU")
alloc_call = mock_post.call_args_list[0]
assert alloc_call.args[0] == "http://orch.local:8700/api/inference/task"
body = alloc_call.kwargs.get("json") or alloc_call.args[1]
assert body == {"product": "snipe", "task": "query_translation"}
def test_orch_releases_allocation_after_success(db_with_categories):
t = _make_orch_translator(db_with_categories)
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
mock_post.side_effect = [
_mock_alloc_response(),
_mock_chat_response(_VALID_LLM_RESPONSE),
]
mock_delete.return_value = _mock_delete_response()
t.translate("GPU")
mock_delete.assert_called_once()
delete_url = mock_delete.call_args.args[0]
assert "alloc-abc123" in delete_url
def test_orch_releases_allocation_on_inference_failure(db_with_categories):
"""Allocation must be released even when the inference call fails."""
t = _make_orch_translator(db_with_categories)
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
mock_post.side_effect = [
_mock_alloc_response(),
Exception("inference timeout"),
]
mock_delete.return_value = _mock_delete_response()
with pytest.raises(QueryTranslatorError, match="LLM backend"):
t.translate("GPU")
mock_delete.assert_called_once()
def test_init_requires_at_least_one_backend(tmp_path):
from circuitforge_core.db import get_connection, run_migrations
conn = get_connection(tmp_path / "test.db")
run_migrations(conn, Path("app/db/migrations"))
cache = EbayCategoryCache(conn)
with pytest.raises(ValueError, match="cforch_url or llm_router"):
QueryTranslator(category_cache=cache)

View file

@ -4,7 +4,7 @@ from __future__ import annotations
import json import json
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import MagicMock, patch, call
import pytest import pytest
@ -47,6 +47,19 @@ def tmp_db(tmp_path: Path) -> Path:
return db return db
_VISION_JSON = json.dumps({
"is_stock_photo": False,
"visible_damage": False,
"authenticity_signal": "genuine_product_photo",
"confidence": "high",
})
_PARAMS = json.dumps({
"photo_url": "https://example.com/photo.jpg",
"listing_title": "Used iPhone 13",
})
def test_llm_task_types_defined(): def test_llm_task_types_defined():
assert "trust_photo_analysis" in LLM_TASK_TYPES assert "trust_photo_analysis" in LLM_TASK_TYPES
@ -75,29 +88,17 @@ def test_insert_task_dedup(tmp_db: Path):
assert new2 is False assert new2 is False
def test_run_task_photo_analysis_success(tmp_db: Path): # ── Local LLMRouter path ──────────────────────────────────────────────────────
"""Vision analysis result is written to trust_scores.photo_analysis_json."""
params = json.dumps({
"listing_id": 1,
"photo_url": "https://example.com/photo.jpg",
"listing_title": "Used iPhone 13",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
vision_result = { def test_run_task_photo_analysis_local_success(tmp_db: Path):
"is_stock_photo": False, """Local path: vision result is written to trust_scores.photo_analysis_json."""
"visible_damage": False, task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
"authenticity_signal": "genuine_product_photo",
"confidence": "high",
}
with patch("app.tasks.runner.requests") as mock_req, \ with patch("app.tasks.runner.requests") as mock_req, \
patch("app.tasks.runner.LLMRouter") as MockRouter: patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON):
mock_req.get.return_value.content = b"fake_image_bytes" mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None mock_req.get.return_value.raise_for_status = lambda: None
instance = MockRouter.return_value run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
instance.complete.return_value = json.dumps(vision_result)
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db) conn = sqlite3.connect(tmp_db)
score_row = conn.execute( score_row = conn.execute(
@ -110,20 +111,16 @@ def test_run_task_photo_analysis_success(tmp_db: Path):
assert task_row[0] == "completed" assert task_row[0] == "completed"
parsed = json.loads(score_row[0]) parsed = json.loads(score_row[0])
assert parsed["is_stock_photo"] is False assert parsed["is_stock_photo"] is False
assert parsed["confidence"] == "high"
def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path): def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path):
"""If photo download fails, task is marked failed without crashing.""" """If photo download fails, task is marked failed without crashing."""
params = json.dumps({ task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
"listing_id": 1,
"photo_url": "https://example.com/bad.jpg",
"listing_title": "Laptop",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
with patch("app.tasks.runner.requests") as mock_req: with patch("app.tasks.runner.requests") as mock_req:
mock_req.get.side_effect = ConnectionError("fetch failed") mock_req.get.side_effect = ConnectionError("fetch failed")
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db) conn = sqlite3.connect(tmp_db)
row = conn.execute( row = conn.execute(
@ -156,3 +153,169 @@ def test_run_task_unknown_type_marks_failed(tmp_db: Path):
).fetchone() ).fetchone()
conn.close() conn.close()
assert row[0] == "failed" assert row[0] == "failed"
# ── cf-orch path ──────────────────────────────────────────────────────────────
def _make_orch_client_mock(vision_json: str) -> MagicMock:
"""Build a CFOrchClient mock whose task_allocate context manager returns an Allocation."""
alloc = MagicMock()
alloc.url = "http://cf-vlm.local:8000"
alloc.model = "bartowski--qwen2-vl-7b-instruct-gguf"
cm = MagicMock()
cm.__enter__ = MagicMock(return_value=alloc)
cm.__exit__ = MagicMock(return_value=False)
client = MagicMock()
client.task_allocate.return_value = cm
return client
def test_run_task_photo_analysis_orch_success(tmp_db: Path):
"""Cloud path: CFOrchClient.task_allocate is used when GPU_SERVER_URL is set."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
chat_resp = MagicMock()
chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.return_value = chat_resp
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db)
score_row = conn.execute(
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
).fetchone()
task_row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert task_row[0] == "completed"
parsed = json.loads(score_row[0])
assert parsed["authenticity_signal"] == "genuine_product_photo"
def test_run_task_photo_analysis_orch_uses_image_assessment_task(tmp_db: Path):
"""task_allocate must be called with product='snipe', task='image_assessment'."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
chat_resp = MagicMock()
chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.return_value = chat_resp
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
client_instance.task_allocate.assert_called_once_with("snipe", "image_assessment")
def test_run_task_photo_analysis_orch_sends_image_url_content(tmp_db: Path):
"""Vision payload must include image_url content block with data URI."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
captured_body: dict = {}
def capture_post(url, **kwargs):
nonlocal captured_body
if "/v1/chat/completions" in url:
captured_body = kwargs.get("json", {})
resp = MagicMock()
resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
resp.raise_for_status = MagicMock()
return resp
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.side_effect = capture_post
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
user_content = captured_body["messages"][1]["content"]
image_blocks = [b for b in user_content if b.get("type") == "image_url"]
assert image_blocks, "No image_url content block found in vision payload"
url = image_blocks[0]["image_url"]["url"]
assert url.startswith("data:image/jpeg;base64,"), f"Unexpected image URL format: {url[:40]}"
def test_run_task_photo_analysis_orch_task_not_found_falls_back(tmp_db: Path):
"""TaskNotFound from cf-orch → graceful fallback to local LLMRouter."""
from circuitforge_orch.client import TaskNotFound
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
cm = MagicMock()
cm.__enter__ = MagicMock(side_effect=TaskNotFound("snipe", "image_assessment"))
cm.__exit__ = MagicMock(return_value=False)
client_instance = MagicMock()
client_instance.task_allocate.return_value = cm
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"GPU_SERVER_URL": "http://cf-orch.local:8700"}), \
patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \
patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
mock_local.assert_called_once()
conn = sqlite3.connect(tmp_db)
task_row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert task_row[0] == "completed"
def test_run_task_photo_analysis_non_json_response_writes_raw(tmp_db: Path):
"""Non-JSON LLM response is stored with parse_error flag rather than crashing."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
with patch("app.tasks.runner.requests") as mock_req, \
patch("app.tasks.runner._assess_via_local_llm", return_value="not valid json at all"):
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db)
score_row = conn.execute(
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
).fetchone()
conn.close()
parsed = json.loads(score_row[0])
assert parsed.get("parse_error") is True
assert "raw_response" in parsed

View file

@ -296,3 +296,37 @@ def test_non_retailer_does_not_suppress_duplicate_photo():
) )
result = agg.aggregate(_ALL_20.copy(), photo_hash_duplicate=True, seller=seller) result = agg.aggregate(_ALL_20.copy(), photo_hash_duplicate=True, seller=seller)
assert "duplicate_photo" in result.red_flags_json assert "duplicate_photo" in result.red_flags_json
# ── #52: buyer-only / returning seller (ratio=0.0, count>0) ──────────────────
def test_zero_ratio_with_count_gives_no_recent_seller_data_flag():
"""Seller with 117 lifetime feedbacks (buyer-only) has ratio=0.0 parsed from page.
Must get no_recent_seller_data soft flag, NOT established_bad_actor."""
agg = Aggregator()
scores = {k: 10 for k in ["account_age", "feedback_count",
"feedback_ratio", "price_vs_market", "category_history"]}
buyer_only = Seller(
platform="ebay", platform_seller_id="u", username="jjcpryz",
account_age_days=1200, feedback_count=117, feedback_ratio=0.0,
category_history_json="{}",
)
result = agg.aggregate(scores, photo_hash_duplicate=False, seller=buyer_only)
assert "no_recent_seller_data" in result.red_flags_json
assert "established_bad_actor" not in result.red_flags_json
def test_established_bad_actor_still_fires_for_genuinely_bad_ratio():
"""ratio=0.75 (not zero) with moderate count → established_bad_actor still fires."""
agg = Aggregator()
scores = {k: 10 for k in ["account_age", "feedback_count",
"feedback_ratio", "price_vs_market", "category_history"]}
bad = Seller(
platform="ebay", platform_seller_id="u", username="u",
account_age_days=500, feedback_count=100, feedback_ratio=0.75,
category_history_json="{}",
)
result = agg.aggregate(scores, photo_hash_duplicate=False, seller=bad)
assert "established_bad_actor" in result.red_flags_json
assert "no_recent_seller_data" not in result.red_flags_json

View file

@ -43,3 +43,26 @@ def test_no_market_data_returns_none():
scores = scorer.score(_seller(), market_median=None, listing_price=950.0) scores = scorer.score(_seller(), market_median=None, listing_price=950.0)
# None signals "data unavailable" — aggregator will set score_is_partial=True # None signals "data unavailable" — aggregator will set score_is_partial=True
assert scores["price_vs_market"] is None assert scores["price_vs_market"] is None
def test_zero_ratio_with_nonzero_count_returns_none():
"""ratio=0.0 with count>0 means eBay didn't show a 12-month percentage.
Must return None (missing data) not 0 (catastrophically bad)."""
scorer = MetadataScorer()
scores = scorer.score(
_seller(feedback_ratio=0.0, feedback_count=117),
market_median=None, listing_price=500.0,
)
assert scores["feedback_ratio"] is None
def test_zero_ratio_with_zero_count_scores_low():
"""feedback_ratio=0.0 with count=0 is a real 'no data at all' case, not missing."""
scorer = MetadataScorer()
scores = scorer.score(
_seller(feedback_ratio=0.0, feedback_count=0),
market_median=None, listing_price=500.0,
)
# count=0 means zero_feedback; ratio=0 with count=0 is the standard no-history path
# (not the "missing 12-month window" path)
assert scores["feedback_ratio"] == 5 # ratio < 0.90 → 5