diff --git a/README.md b/README.md index 6234658..bb8bcc1 100644 --- a/README.md +++ b/README.md @@ -159,6 +159,7 @@ docker compose -f compose.cloud.yml -p snipe-cloud build api # after Python cha Online auctions are frustrating because: - Winning requires being present at the exact closing moment — sometimes 2 AM - Platforms vary wildly: some allow proxy bids, some don't; closing times extend on activity +- Scammers exploit auction urgency — new accounts, stolen photos, pressure to pay outside platform - Price history is hidden — you don't know if an item is underpriced or a trap - Sellers hide damage in descriptions rather than titles to avoid automated filters - Shipping logistics for large / fragile antiques require coordination with the auction house diff --git a/api/cloud_session.py b/api/cloud_session.py index a977344..467702f 100644 --- a/api/cloud_session.py +++ b/api/cloud_session.py @@ -26,6 +26,7 @@ from dataclasses import dataclass from pathlib import Path from typing import Optional +import jwt as pyjwt import requests from fastapi import Depends, HTTPException, Request @@ -109,7 +110,6 @@ def validate_session_jwt(token: str) -> str: Directus 11+ uses 'id' (not 'sub') for the user UUID in its JWT payload. """ try: - import jwt as pyjwt payload = pyjwt.decode( token, DIRECTUS_JWT_SECRET, diff --git a/api/main.py b/api/main.py index ca9ddef..6bd87a3 100644 --- a/api/main.py +++ b/api/main.py @@ -6,15 +6,19 @@ import hashlib import logging import os from concurrent.futures import ThreadPoolExecutor +from contextlib import asynccontextmanager from pathlib import Path -from fastapi import Depends, FastAPI, HTTPException +import csv +import io +from fastapi import Depends, FastAPI, HTTPException, UploadFile, File +from fastapi.responses import StreamingResponse from pydantic import BaseModel from fastapi.middleware.cors import CORSMiddleware from circuitforge_core.config import load_env from app.db.store import Store -from app.db.models import SavedSearch as SavedSearchModel +from app.db.models import SavedSearch as SavedSearchModel, ScammerEntry from app.platforms import SearchFilters from app.platforms.ebay.scraper import ScrapedEbayAdapter from app.platforms.ebay.adapter import EbayAdapter @@ -28,6 +32,22 @@ load_env(Path(".env")) log = logging.getLogger(__name__) +@asynccontextmanager +async def _lifespan(app: FastAPI): + # Start vision/LLM background task scheduler. + # background_tasks queue lives in shared_db (cloud) or local_db (local) + # so the scheduler has a single stable DB path across all cloud users. + from app.tasks.scheduler import get_scheduler, reset_scheduler + from api.cloud_session import CLOUD_MODE, _LOCAL_SNIPE_DB, _shared_db_path + sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB + get_scheduler(sched_db) + log.info("Snipe task scheduler started (db=%s)", sched_db) + yield + get_scheduler(sched_db).shutdown(timeout=10.0) + reset_scheduler() + log.info("Snipe task scheduler stopped.") + + def _ebay_creds() -> tuple[str, str, str]: """Return (client_id, client_secret, env) from env vars. @@ -43,7 +63,7 @@ def _ebay_creds() -> tuple[str, str, str]: client_secret = (os.environ.get("EBAY_CERT_ID") or os.environ.get("EBAY_CLIENT_SECRET", "")).strip() return client_id, client_secret, env -app = FastAPI(title="Snipe API", version="0.1.0") +app = FastAPI(title="Snipe API", version="0.1.0", lifespan=_lifespan) app.include_router(ebay_webhook_router) app.add_middleware( @@ -111,7 +131,7 @@ def _trigger_scraper_enrichment( seller = shared_store.get_seller("ebay", sid) if not seller: continue - if (seller.account_age_days is None + if ((seller.account_age_days is None or seller.feedback_count == 0) and sid not in needs_btf and len(needs_btf) < _BTF_MAX_PER_SEARCH): needs_btf[sid] = listing.platform_listing_id @@ -145,6 +165,55 @@ def _trigger_scraper_enrichment( t.start() +def _enqueue_vision_tasks( + listings: list, + trust_scores_list: list, + session: "CloudUser", +) -> None: + """Enqueue trust_photo_analysis tasks for listings with photos. + + Runs fire-and-forget: tasks land in the scheduler queue and the response + returns immediately. Results are written back to trust_scores.photo_analysis_json + by the runner when the vision LLM completes. + + session.shared_db: where background_tasks lives (scheduler's DB). + session.user_db: encoded in params so the runner writes to the right + trust_scores table in cloud mode. + """ + import json as _json + from app.tasks.runner import insert_task + from app.tasks.scheduler import get_scheduler + from api.cloud_session import CLOUD_MODE, _shared_db_path, _LOCAL_SNIPE_DB + + sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB + sched = get_scheduler(sched_db) + + enqueued = 0 + for listing, ts in zip(listings, trust_scores_list): + if not listing.photo_urls or not listing.id: + continue + params = _json.dumps({ + "photo_url": listing.photo_urls[0], + "listing_title": listing.title, + "user_db": str(session.user_db), + }) + task_id, is_new = insert_task( + sched_db, "trust_photo_analysis", job_id=listing.id, params=params + ) + if is_new: + ok = sched.enqueue(task_id, "trust_photo_analysis", listing.id, params) + if not ok: + log.warning( + "Vision task queue full — dropped task for listing %s", + listing.platform_listing_id, + ) + else: + enqueued += 1 + + if enqueued: + log.info("Enqueued %d vision analysis task(s)", enqueued) + + def _parse_terms(raw: str) -> list[str]: """Split a comma-separated keyword string into non-empty, stripped terms.""" return [t.strip() for t in raw.split(",") if t.strip()] @@ -293,6 +362,14 @@ def search( scorer = TrustScorer(shared_store) trust_scores_list = scorer.score_batch(listings, q) + # Persist trust scores so background vision tasks have a row to UPDATE. + user_store.save_trust_scores(trust_scores_list) + + # Enqueue vision analysis for listings with photos — Paid tier and above. + features = compute_features(session.tier) + if features.photo_analysis: + _enqueue_vision_tasks(listings, trust_scores_list, session) + query_hash = hashlib.md5(q.encode()).hexdigest() comp = shared_store.get_market_comp("ebay", query_hash) market_price = comp.median_price if comp else None @@ -359,7 +436,9 @@ def enrich_seller( pass # no API creds — fall through to BTF seller_obj = shared_store.get_seller("ebay", seller) - needs_btf = seller_obj is not None and seller_obj.account_age_days is None + needs_btf = seller_obj is not None and ( + seller_obj.account_age_days is None or seller_obj.feedback_count == 0 + ) needs_categories = seller_obj is None or seller_obj.category_history_json in ("{}", "", None) # Slow path: Playwright for remaining gaps (BTF + _ssn in parallel threads). @@ -458,3 +537,95 @@ def delete_saved_search(saved_id: int, session: CloudUser = Depends(get_session) def mark_saved_search_run(saved_id: int, session: CloudUser = Depends(get_session)): Store(session.user_db).update_saved_search_last_run(saved_id) return {"ok": True} + + +# ── Scammer Blocklist ───────────────────────────────────────────────────────── +# Blocklist lives in shared_db: all users on a shared cloud instance see the +# same community blocklist. In local (single-user) mode shared_db == user_db. + +class BlocklistAdd(BaseModel): + platform: str = "ebay" + platform_seller_id: str + username: str + reason: str = "" + + +@app.get("/api/blocklist") +def list_blocklist(session: CloudUser = Depends(get_session)): + store = Store(session.shared_db) + return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]} + + +@app.post("/api/blocklist", status_code=201) +def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_session)): + store = Store(session.shared_db) + entry = store.add_to_blocklist(ScammerEntry( + platform=body.platform, + platform_seller_id=body.platform_seller_id, + username=body.username, + reason=body.reason or None, + source="manual", + )) + return dataclasses.asdict(entry) + + +@app.delete("/api/blocklist/{platform_seller_id}", status_code=204) +def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)): + Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id) + + +@app.get("/api/blocklist/export") +def export_blocklist(session: CloudUser = Depends(get_session)): + """Download the blocklist as a CSV file.""" + entries = Store(session.shared_db).list_blocklist() + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"]) + for e in entries: + writer.writerow([e.platform, e.platform_seller_id, e.username, + e.reason or "", e.source, e.created_at or ""]) + buf.seek(0) + return StreamingResponse( + iter([buf.getvalue()]), + media_type="text/csv", + headers={"Content-Disposition": "attachment; filename=snipe-blocklist.csv"}, + ) + + +@app.post("/api/blocklist/import", status_code=201) +async def import_blocklist( + file: UploadFile = File(...), + session: CloudUser = Depends(get_session), +): + """Import a CSV blocklist. Columns: platform_seller_id, username, reason (optional).""" + content = await file.read() + try: + text = content.decode("utf-8-sig") # handle BOM from Excel exports + except UnicodeDecodeError: + raise HTTPException(status_code=400, detail="File must be UTF-8 encoded") + + store = Store(session.shared_db) + imported = 0 + errors: list[str] = [] + reader = csv.DictReader(io.StringIO(text)) + + # Accept both full-export format (has 'platform' col) and simple format (no 'platform' col). + for i, row in enumerate(reader, start=2): + seller_id = (row.get("platform_seller_id") or "").strip() + username = (row.get("username") or "").strip() + if not seller_id or not username: + errors.append(f"Row {i}: missing platform_seller_id or username — skipped") + continue + platform = (row.get("platform") or "ebay").strip() + reason = (row.get("reason") or "").strip() or None + store.add_to_blocklist(ScammerEntry( + platform=platform, + platform_seller_id=seller_id, + username=username, + reason=reason, + source="csv_import", + )) + imported += 1 + + log.info("Blocklist import: %d added, %d errors", imported, len(errors)) + return {"imported": imported, "errors": errors} diff --git a/app/db/migrations/006_scammer_blocklist.sql b/app/db/migrations/006_scammer_blocklist.sql new file mode 100644 index 0000000..8be9410 --- /dev/null +++ b/app/db/migrations/006_scammer_blocklist.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS scammer_blocklist ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + platform TEXT NOT NULL, + platform_seller_id TEXT NOT NULL, + username TEXT NOT NULL, + reason TEXT, + source TEXT NOT NULL DEFAULT 'manual', -- manual | csv_import | community + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + UNIQUE(platform, platform_seller_id) +); + +CREATE INDEX IF NOT EXISTS idx_scammer_blocklist_lookup + ON scammer_blocklist(platform, platform_seller_id); diff --git a/app/db/migrations/007_background_tasks.sql b/app/db/migrations/007_background_tasks.sql new file mode 100644 index 0000000..fe89658 --- /dev/null +++ b/app/db/migrations/007_background_tasks.sql @@ -0,0 +1,24 @@ +-- 007_background_tasks.sql +-- Shared background task queue used by the LLM/vision task scheduler. +-- Schema mirrors the circuitforge-core standard. +-- Also adds UNIQUE constraint on trust_scores(listing_id) so save_trust_scores() +-- can use ON CONFLICT upsert semantics. + +CREATE TABLE IF NOT EXISTS background_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_type TEXT NOT NULL, + job_id INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'queued', + params TEXT, + error TEXT, + stage TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_bg_tasks_status_type + ON background_tasks (status, task_type); + +-- Enable ON CONFLICT upsert in save_trust_scores() — idempotent on existing DBs. +CREATE UNIQUE INDEX IF NOT EXISTS idx_trust_scores_listing + ON trust_scores (listing_id); diff --git a/app/db/models.py b/app/db/models.py index 5d1d2a1..3f0acde 100644 --- a/app/db/models.py +++ b/app/db/models.py @@ -82,6 +82,18 @@ class SavedSearch: last_run_at: Optional[str] = None +@dataclass +class ScammerEntry: + """A seller manually or community-flagged as a known scammer.""" + platform: str + platform_seller_id: str + username: str + reason: Optional[str] = None + source: str = "manual" # "manual" | "csv_import" | "community" + id: Optional[int] = None + created_at: Optional[str] = None + + @dataclass class PhotoHash: """Perceptual hash store for cross-search dedup (v0.2+). Schema scaffolded in v0.1.""" diff --git a/app/db/store.py b/app/db/store.py index 56fe1b4..b246fa8 100644 --- a/app/db/store.py +++ b/app/db/store.py @@ -7,15 +7,18 @@ from typing import Optional from circuitforge_core.db import get_connection, run_migrations -from .models import Listing, Seller, TrustScore, MarketComp, SavedSearch +from .models import Listing, Seller, TrustScore, MarketComp, SavedSearch, ScammerEntry MIGRATIONS_DIR = Path(__file__).parent / "migrations" class Store: def __init__(self, db_path: Path): + self._db_path = db_path self._conn = get_connection(db_path) run_migrations(self._conn, MIGRATIONS_DIR) + # WAL mode: allows concurrent readers + one writer without blocking + self._conn.execute("PRAGMA journal_mode=WAL") # --- Seller --- @@ -35,11 +38,26 @@ class Store: self.save_sellers([seller]) def save_sellers(self, sellers: list[Seller]) -> None: + # COALESCE preserves enriched signals (account_age_days, category_history_json) + # that were filled by BTF / _ssn passes — never overwrite with NULL from a + # fresh search page that doesn't carry those signals. self._conn.executemany( - "INSERT OR REPLACE INTO sellers " + "INSERT INTO sellers " "(platform, platform_seller_id, username, account_age_days, " "feedback_count, feedback_ratio, category_history_json) " - "VALUES (?,?,?,?,?,?,?)", + "VALUES (?,?,?,?,?,?,?) " + "ON CONFLICT(platform, platform_seller_id) DO UPDATE SET " + " username = excluded.username, " + " feedback_count = excluded.feedback_count, " + " feedback_ratio = excluded.feedback_ratio, " + " account_age_days = COALESCE(excluded.account_age_days, sellers.account_age_days), " + " category_history_json = COALESCE(" + " CASE WHEN excluded.category_history_json IN ('{}', '', NULL) THEN NULL " + " ELSE excluded.category_history_json END, " + " CASE WHEN sellers.category_history_json IN ('{}', '', NULL) THEN NULL " + " ELSE sellers.category_history_json END, " + " '{}'" + " )", [ (s.platform, s.platform_seller_id, s.username, s.account_age_days, s.feedback_count, s.feedback_ratio, s.category_history_json) @@ -224,6 +242,43 @@ class Store: price_at_first_seen=row[17], ) + # --- TrustScore --- + + def save_trust_scores(self, scores: list[TrustScore]) -> None: + """Upsert trust scores keyed by listing_id. + + photo_analysis_json is preserved on conflict so background vision + results written by the task runner are never overwritten by a re-score. + Requires idx_trust_scores_listing UNIQUE index (migration 007). + """ + self._conn.executemany( + "INSERT INTO trust_scores " + "(listing_id, composite_score, account_age_score, feedback_count_score, " + "feedback_ratio_score, price_vs_market_score, category_history_score, " + "photo_hash_duplicate, red_flags_json, score_is_partial) " + "VALUES (?,?,?,?,?,?,?,?,?,?) " + "ON CONFLICT(listing_id) DO UPDATE SET " + " composite_score = excluded.composite_score, " + " account_age_score = excluded.account_age_score, " + " feedback_count_score = excluded.feedback_count_score, " + " feedback_ratio_score = excluded.feedback_ratio_score, " + " price_vs_market_score = excluded.price_vs_market_score, " + " category_history_score= excluded.category_history_score, " + " photo_hash_duplicate = excluded.photo_hash_duplicate, " + " red_flags_json = excluded.red_flags_json, " + " score_is_partial = excluded.score_is_partial, " + " scored_at = CURRENT_TIMESTAMP", + # photo_analysis_json intentionally omitted — runner owns that column + [ + (s.listing_id, s.composite_score, s.account_age_score, + s.feedback_count_score, s.feedback_ratio_score, + s.price_vs_market_score, s.category_history_score, + int(s.photo_hash_duplicate), s.red_flags_json, int(s.score_is_partial)) + for s in scores if s.listing_id + ], + ) + self._conn.commit() + # --- MarketComp --- def save_market_comp(self, comp: MarketComp) -> None: @@ -274,6 +329,58 @@ class Store: ) self._conn.commit() + # --- ScammerBlocklist --- + + def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry: + """Upsert a seller into the blocklist. Returns the saved entry with id and created_at.""" + self._conn.execute( + "INSERT INTO scammer_blocklist " + "(platform, platform_seller_id, username, reason, source) " + "VALUES (?,?,?,?,?) " + "ON CONFLICT(platform, platform_seller_id) DO UPDATE SET " + " username = excluded.username, " + " reason = COALESCE(excluded.reason, scammer_blocklist.reason), " + " source = excluded.source", + (entry.platform, entry.platform_seller_id, entry.username, + entry.reason, entry.source), + ) + self._conn.commit() + row = self._conn.execute( + "SELECT id, created_at FROM scammer_blocklist " + "WHERE platform=? AND platform_seller_id=?", + (entry.platform, entry.platform_seller_id), + ).fetchone() + from dataclasses import replace + return replace(entry, id=row[0], created_at=row[1]) + + def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None: + self._conn.execute( + "DELETE FROM scammer_blocklist WHERE platform=? AND platform_seller_id=?", + (platform, platform_seller_id), + ) + self._conn.commit() + + def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool: + row = self._conn.execute( + "SELECT 1 FROM scammer_blocklist WHERE platform=? AND platform_seller_id=? LIMIT 1", + (platform, platform_seller_id), + ).fetchone() + return row is not None + + def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]: + rows = self._conn.execute( + "SELECT platform, platform_seller_id, username, reason, source, id, created_at " + "FROM scammer_blocklist WHERE platform=? ORDER BY created_at DESC", + (platform,), + ).fetchall() + return [ + ScammerEntry( + platform=r[0], platform_seller_id=r[1], username=r[2], + reason=r[3], source=r[4], id=r[5], created_at=r[6], + ) + for r in rows + ] + def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]: row = self._conn.execute( "SELECT platform, query_hash, median_price, sample_count, expires_at, id, fetched_at " diff --git a/app/platforms/ebay/scraper.py b/app/platforms/ebay/scraper.py index 5c9d999..2635f08 100644 --- a/app/platforms/ebay/scraper.py +++ b/app/platforms/ebay/scraper.py @@ -32,6 +32,9 @@ EBAY_SEARCH_URL = "https://www.ebay.com/sch/i.html" EBAY_ITEM_URL = "https://www.ebay.com/itm/" _HTML_CACHE_TTL = 300 # seconds — 5 minutes _JOINED_RE = re.compile(r"Joined\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\w*\s+(\d{4})", re.I) +# Matches "username (1,234) 99.1% positive feedback" on /itm/ listing pages. +# Capture groups: 1=raw_count ("1,234"), 2=ratio_pct ("99.1"). +_ITEM_FEEDBACK_RE = re.compile(r'\((\d[\d,]*)\)\s*([\d.]+)%\s*positive', re.I) _MONTH_MAP = {m: i+1 for i, m in enumerate( ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"] )} @@ -371,6 +374,23 @@ class ScrapedEbayAdapter(PlatformAdapter): except ValueError: return None + @staticmethod + def _parse_feedback_from_item(html: str) -> tuple[Optional[int], Optional[float]]: + """Parse feedback count and ratio from a listing page seller card. + + Matches 'username (1,234) 99.1% positive feedback'. + Returns (count, ratio) or (None, None) if not found. + """ + m = _ITEM_FEEDBACK_RE.search(html) + if not m: + return None, None + try: + count = int(m.group(1).replace(",", "")) + ratio = float(m.group(2)) / 100.0 + return count, ratio + except ValueError: + return None, None + def enrich_sellers_btf( self, seller_to_listing: dict[str, str], @@ -387,19 +407,38 @@ class ScrapedEbayAdapter(PlatformAdapter): Does not raise — failures per-seller are silently skipped so the main search response is never blocked. """ + db_path = self._store._db_path # capture for thread-local Store creation + def _enrich_one(item: tuple[str, str]) -> None: seller_id, listing_id = item try: html = self._fetch_item_html(listing_id) age_days = self._parse_joined_date(html) + fb_count, fb_ratio = self._parse_feedback_from_item(html) + log.debug( + "BTF enrich: seller=%s age_days=%s feedback=%s ratio=%s", + seller_id, age_days, fb_count, fb_ratio, + ) + if age_days is None and fb_count is None: + return # nothing new to write + thread_store = Store(db_path) + seller = thread_store.get_seller("ebay", seller_id) + if not seller: + log.warning("BTF enrich: seller %s not found in DB", seller_id) + return + from dataclasses import replace + updates: dict = {} if age_days is not None: - seller = self._store.get_seller("ebay", seller_id) - if seller: - from dataclasses import replace - updated = replace(seller, account_age_days=age_days) - self._store.save_seller(updated) - except Exception: - pass # non-fatal: partial score is better than a crashed enrichment + updates["account_age_days"] = age_days + # Only overwrite feedback if the listing page found a real value — + # prefer a fresh count over a 0 that came from a failed search parse. + if fb_count is not None: + updates["feedback_count"] = fb_count + if fb_ratio is not None: + updates["feedback_ratio"] = fb_ratio + thread_store.save_seller(replace(seller, **updates)) + except Exception as exc: + log.warning("BTF enrich failed for %s/%s: %s", seller_id, listing_id, exc) with ThreadPoolExecutor(max_workers=max_workers) as ex: list(ex.map(_enrich_one, seller_to_listing.items())) diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tasks/runner.py b/app/tasks/runner.py new file mode 100644 index 0000000..beea57a --- /dev/null +++ b/app/tasks/runner.py @@ -0,0 +1,171 @@ +# app/tasks/runner.py +"""Snipe background task runner. + +Implements the run_task_fn interface expected by circuitforge_core.tasks.scheduler. + +Current task types: + trust_photo_analysis — download primary photo, run vision LLM, write + result to trust_scores.photo_analysis_json (Paid tier). + +Prompt note: The vision prompt is a functional first pass. Tune against real +eBay listings before GA — specifically stock-photo vs genuine-product distinction +and the damage vocabulary. +""" +from __future__ import annotations + +import base64 +import json +import logging +from pathlib import Path + +import requests + +from circuitforge_core.db import get_connection +from circuitforge_core.llm import LLMRouter + +log = logging.getLogger(__name__) + +LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"}) + +VRAM_BUDGETS: dict[str, float] = { + # moondream2 / vision-capable LLM — single image, short response + "trust_photo_analysis": 2.0, +} + +_VISION_SYSTEM_PROMPT = ( + "You are an expert at evaluating eBay listing photos for authenticity and condition. " + "Respond ONLY with a JSON object containing these exact keys:\n" + " is_stock_photo: bool — true if this looks like a manufacturer/marketing image\n" + " visible_damage: bool — true if scratches, dents, cracks, or defects are visible\n" + " authenticity_signal: string — one of 'genuine_product_photo', 'stock_photo', 'unclear'\n" + " confidence: string — one of 'high', 'medium', 'low'\n" + "No explanation outside the JSON object." +) + + +def insert_task( + db_path: Path, + task_type: str, + job_id: int, + *, + params: str | None = None, +) -> tuple[int, bool]: + """Insert a background task if no identical task is already in-flight. + + Uses get_connection() so WAL mode and timeout=30 apply — same as all other + Snipe DB access. Returns (task_id, is_new). + """ + conn = get_connection(db_path) + conn.row_factory = __import__("sqlite3").Row + try: + existing = conn.execute( + "SELECT id FROM background_tasks " + "WHERE task_type=? AND job_id=? AND status IN ('queued','running')", + (task_type, job_id), + ).fetchone() + if existing: + return existing["id"], False + cursor = conn.execute( + "INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)", + (task_type, job_id, params), + ) + conn.commit() + return cursor.lastrowid, True + finally: + conn.close() + + +def _update_task_status( + db_path: Path, task_id: int, status: str, *, error: str = "" +) -> None: + with get_connection(db_path) as conn: + conn.execute( + "UPDATE background_tasks " + "SET status=?, error=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", + (status, error, task_id), + ) + + +def run_task( + db_path: Path, + task_id: int, + task_type: str, + job_id: int, + params: str | None = None, +) -> None: + """Execute one background task. Called by the scheduler's batch worker.""" + _update_task_status(db_path, task_id, "running") + try: + if task_type == "trust_photo_analysis": + _run_trust_photo_analysis(db_path, job_id, params) + else: + raise ValueError(f"Unknown snipe task type: {task_type!r}") + _update_task_status(db_path, task_id, "completed") + except Exception as exc: + log.exception("Task %d (%s) failed: %s", task_id, task_type, exc) + _update_task_status(db_path, task_id, "failed", error=str(exc)) + + +def _run_trust_photo_analysis( + db_path: Path, + listing_id: int, + params: str | None, +) -> None: + """Download primary listing photo, run vision LLM, write to trust_scores. + + In cloud mode the result must be written to the per-user DB, which differs + from db_path (the scheduler's shared task-queue DB). The enqueue call site + encodes the correct write target as 'user_db' in params; in local mode it + falls back to db_path so the single-DB layout keeps working. + """ + p = json.loads(params or "{}") + photo_url = p.get("photo_url", "") + listing_title = p.get("listing_title", "") + # user_db: per-user DB in cloud mode; same as db_path in local mode. + result_db = Path(p.get("user_db", str(db_path))) + + if not photo_url: + raise ValueError("trust_photo_analysis: 'photo_url' is required in params") + + # Download and base64-encode the photo + resp = requests.get(photo_url, timeout=10) + resp.raise_for_status() + image_b64 = base64.b64encode(resp.content).decode() + + # Build user prompt with optional title context + user_prompt = "Evaluate this eBay listing photo." + if listing_title: + user_prompt = f"Evaluate this eBay listing photo for: {listing_title}" + + # Call LLMRouter with vision capability + router = LLMRouter() + raw = router.complete( + user_prompt, + system=_VISION_SYSTEM_PROMPT, + images=[image_b64], + max_tokens=128, + ) + + # Parse — be lenient: strip markdown fences if present + try: + cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() + analysis = json.loads(cleaned) + except json.JSONDecodeError: + log.warning( + "Vision LLM returned non-JSON for listing %d: %r", listing_id, raw[:200] + ) + analysis = {"raw_response": raw, "parse_error": True} + + with get_connection(result_db) as conn: + conn.execute( + "UPDATE trust_scores SET photo_analysis_json=? WHERE listing_id=?", + (json.dumps(analysis), listing_id), + ) + + log.info( + "Vision analysis for listing %d: stock=%s damage=%s confidence=%s", + listing_id, + analysis.get("is_stock_photo"), + analysis.get("visible_damage"), + analysis.get("confidence"), + ) diff --git a/app/tasks/scheduler.py b/app/tasks/scheduler.py new file mode 100644 index 0000000..a45e0ae --- /dev/null +++ b/app/tasks/scheduler.py @@ -0,0 +1,23 @@ +# app/tasks/scheduler.py +"""Snipe LLM/vision task scheduler — thin shim over circuitforge_core.tasks.scheduler.""" +from __future__ import annotations + +from pathlib import Path + +from circuitforge_core.tasks.scheduler import ( + TaskScheduler, + get_scheduler as _base_get_scheduler, + reset_scheduler, # re-export for tests +) + +from app.tasks.runner import LLM_TASK_TYPES, VRAM_BUDGETS, run_task + + +def get_scheduler(db_path: Path) -> TaskScheduler: + """Return the process-level TaskScheduler singleton for Snipe.""" + return _base_get_scheduler( + db_path=db_path, + run_task_fn=run_task, + task_types=LLM_TASK_TYPES, + vram_budgets=VRAM_BUDGETS, + ) diff --git a/app/tiers.py b/app/tiers.py index c55b5ec..b355466 100644 --- a/app/tiers.py +++ b/app/tiers.py @@ -1,22 +1,44 @@ -"""Snipe feature gates. Delegates to circuitforge_core.tiers.""" +"""Snipe feature gates. Delegates to circuitforge_core.tiers. + +Tier ladder: free < paid < premium +Ultra is not used in Snipe — auto-bidding is the highest-impact feature and is Premium. + +BYOK unlock analog: LOCAL_VISION_UNLOCKABLE — photo_analysis and serial_number_check +unlock when the user has a local vision model (moondream2 (MD2) or equivalent). + +Intentionally ungated (free for all): + - metadata_trust_scoring — core value prop; wide adoption preferred + - hash_dedup — infrastructure, not a differentiator + - market_comps — useful enough to drive signups; not scarce + - scammer_db — community data is more valuable with wider reach + - saved_searches — retention feature; friction cost outweighs gate value +""" from __future__ import annotations from circuitforge_core.tiers import can_use as _core_can_use, TIERS # noqa: F401 # Feature key → minimum tier required. FEATURES: dict[str, str] = { - # Free tier - "metadata_trust_scoring": "free", - "hash_dedup": "free", # Paid tier "photo_analysis": "paid", "serial_number_check": "paid", "ai_image_detection": "paid", "reverse_image_search": "paid", - "saved_searches": "paid", - "background_monitoring": "paid", + "ebay_oauth": "paid", # full trust scores via eBay Trading API + "background_monitoring": "paid", # limited at Paid; see LIMITS below + + # Premium tier + "auto_bidding": "premium", } -# Photo analysis features unlock if user has local vision model (moondream2 (MD2) or similar). +# Per-feature usage limits by tier. None = unlimited. +# Call get_limit(feature, tier) at enforcement points (e.g. before creating a new monitor). +LIMITS: dict[tuple[str, str], int | None] = { + ("background_monitoring", "paid"): 5, + ("background_monitoring", "premium"): 25, +} + +# Unlock photo_analysis and serial_number_check when user has a local vision model. +# Same policy as Peregrine's BYOK_UNLOCKABLE: user is providing the compute. LOCAL_VISION_UNLOCKABLE: frozenset[str] = frozenset({ "photo_analysis", "serial_number_check", @@ -32,3 +54,19 @@ def can_use( if has_local_vision and feature in LOCAL_VISION_UNLOCKABLE: return True return _core_can_use(feature, tier, has_byok=has_byok, _features=FEATURES) + + +def get_limit(feature: str, tier: str) -> int | None: + """Return the usage limit for a feature at the given tier. + + Returns None if the feature is unlimited at this tier. + Returns None if the feature has no entry in LIMITS (treat as unlimited). + Call can_use() first — get_limit() does not check tier eligibility. + + Example: + if can_use("background_monitoring", tier): + limit = get_limit("background_monitoring", tier) + if limit is not None and current_count >= limit: + raise LimitExceeded(f"Paid tier allows {limit} active monitors. Upgrade to Premium for unlimited.") + """ + return LIMITS.get((feature, tier)) diff --git a/app/trust/__init__.py b/app/trust/__init__.py index bc2b9c6..7eb50ff 100644 --- a/app/trust/__init__.py +++ b/app/trust/__init__.py @@ -41,6 +41,7 @@ class TrustScorer: scores = [] for listing, is_dup in zip(listings, duplicates): seller = self._store.get_seller("ebay", listing.seller_platform_id) + blocklisted = self._store.is_blocklisted("ebay", listing.seller_platform_id) if seller: signal_scores = self._meta.score(seller, market_median, listing.price, price_cv) else: @@ -55,6 +56,7 @@ class TrustScorer: first_seen_at=listing.first_seen_at, price=listing.price, price_at_first_seen=listing.price_at_first_seen, + is_blocklisted=blocklisted, ) scores.append(trust) return scores diff --git a/app/trust/aggregator.py b/app/trust/aggregator.py index 5fd6ec0..b768d62 100644 --- a/app/trust/aggregator.py +++ b/app/trust/aggregator.py @@ -76,6 +76,7 @@ class Aggregator: first_seen_at: Optional[str] = None, price: float = 0.0, price_at_first_seen: Optional[float] = None, + is_blocklisted: bool = False, ) -> TrustScore: is_partial = any(v is None for v in signal_scores.values()) clean = {k: (v if v is not None else 0) for k, v in signal_scores.items()} @@ -92,6 +93,23 @@ class Aggregator: red_flags: list[str] = [] + # Blocklist: force established_bad_actor and zero the score regardless of other signals. + if is_blocklisted: + red_flags.append("established_bad_actor") + composite = 0 + return TrustScore( + listing_id=listing_id, + composite_score=composite, + account_age_score=clean["account_age"], + feedback_count_score=clean["feedback_count"], + feedback_ratio_score=clean["feedback_ratio"], + price_vs_market_score=clean["price_vs_market"], + category_history_score=clean["category_history"], + photo_hash_duplicate=photo_hash_duplicate, + red_flags_json=json.dumps(red_flags), + score_is_partial=is_partial, + ) + # Hard filters if seller and seller.account_age_days is not None and seller.account_age_days < HARD_FILTER_AGE_DAYS: red_flags.append("new_account") @@ -100,6 +118,11 @@ class Aggregator: and seller.feedback_count > HARD_FILTER_BAD_RATIO_MIN_COUNT ): red_flags.append("established_bad_actor") + if seller and seller.feedback_count == 0: + red_flags.append("zero_feedback") + # Zero feedback is a deliberate signal, not missing data — cap composite score + # so a 0-feedback seller can never appear trustworthy on other signals alone. + composite = min(composite, 35) # Soft flags if seller and seller.account_age_days is not None and seller.account_age_days < 30: diff --git a/pyproject.toml b/pyproject.toml index da08de8..ba5c077 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,10 +17,12 @@ dependencies = [ "beautifulsoup4>=4.12", "lxml>=5.0", "fastapi>=0.111", + "python-multipart>=0.0.9", "uvicorn[standard]>=0.29", "playwright>=1.44", "playwright-stealth>=1.0", "cryptography>=42.0", + "PyJWT>=2.8", ] [tool.setuptools.packages.find] diff --git a/tests/test_tasks/__init__.py b/tests/test_tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_tasks/test_runner.py b/tests/test_tasks/test_runner.py new file mode 100644 index 0000000..0a44b7c --- /dev/null +++ b/tests/test_tasks/test_runner.py @@ -0,0 +1,158 @@ +"""Tests for snipe background task runner.""" +from __future__ import annotations + +import json +import sqlite3 +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from app.tasks.runner import ( + LLM_TASK_TYPES, + VRAM_BUDGETS, + insert_task, + run_task, +) + + +@pytest.fixture +def tmp_db(tmp_path: Path) -> Path: + db = tmp_path / "snipe.db" + conn = sqlite3.connect(db) + conn.executescript(""" + CREATE TABLE background_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_type TEXT NOT NULL, + job_id INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'queued', + params TEXT, + error TEXT, + stage TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE trust_scores ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + listing_id INTEGER NOT NULL, + composite_score INTEGER NOT NULL DEFAULT 0, + photo_analysis_json TEXT, + red_flags_json TEXT NOT NULL DEFAULT '[]', + scored_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + INSERT INTO trust_scores (listing_id, composite_score) VALUES (1, 72); + """) + conn.commit() + conn.close() + return db + + +def test_llm_task_types_defined(): + assert "trust_photo_analysis" in LLM_TASK_TYPES + + +def test_vram_budgets_defined(): + assert "trust_photo_analysis" in VRAM_BUDGETS + assert VRAM_BUDGETS["trust_photo_analysis"] > 0 + + +def test_insert_task_creates_row(tmp_db: Path): + task_id, is_new = insert_task(tmp_db, "trust_photo_analysis", job_id=1) + assert is_new is True + conn = sqlite3.connect(tmp_db) + row = conn.execute( + "SELECT status FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert row[0] == "queued" + + +def test_insert_task_dedup(tmp_db: Path): + id1, new1 = insert_task(tmp_db, "trust_photo_analysis", job_id=1) + id2, new2 = insert_task(tmp_db, "trust_photo_analysis", job_id=1) + assert id1 == id2 + assert new1 is True + assert new2 is False + + +def test_run_task_photo_analysis_success(tmp_db: Path): + """Vision analysis result is written to trust_scores.photo_analysis_json.""" + params = json.dumps({ + "listing_id": 1, + "photo_url": "https://example.com/photo.jpg", + "listing_title": "Used iPhone 13", + }) + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params) + + vision_result = { + "is_stock_photo": False, + "visible_damage": False, + "authenticity_signal": "genuine_product_photo", + "confidence": "high", + } + + with patch("app.tasks.runner.requests") as mock_req, \ + patch("app.tasks.runner.LLMRouter") as MockRouter: + mock_req.get.return_value.content = b"fake_image_bytes" + mock_req.get.return_value.raise_for_status = lambda: None + instance = MockRouter.return_value + instance.complete.return_value = json.dumps(vision_result) + run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) + + conn = sqlite3.connect(tmp_db) + score_row = conn.execute( + "SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1" + ).fetchone() + task_row = conn.execute( + "SELECT status FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert task_row[0] == "completed" + parsed = json.loads(score_row[0]) + assert parsed["is_stock_photo"] is False + + +def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path): + """If photo download fails, task is marked failed without crashing.""" + params = json.dumps({ + "listing_id": 1, + "photo_url": "https://example.com/bad.jpg", + "listing_title": "Laptop", + }) + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params) + + with patch("app.tasks.runner.requests") as mock_req: + mock_req.get.side_effect = ConnectionError("fetch failed") + run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) + + conn = sqlite3.connect(tmp_db) + row = conn.execute( + "SELECT status, error FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert row[0] == "failed" + assert "fetch failed" in row[1] + + +def test_run_task_no_photo_url_marks_failed(tmp_db: Path): + params = json.dumps({"listing_id": 1}) + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params) + run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) + conn = sqlite3.connect(tmp_db) + row = conn.execute( + "SELECT status, error FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert row[0] == "failed" + assert "photo_url" in row[1] + + +def test_run_task_unknown_type_marks_failed(tmp_db: Path): + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1) + run_task(tmp_db, task_id, "unknown_type", 1, None) + conn = sqlite3.connect(tmp_db) + row = conn.execute( + "SELECT status FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert row[0] == "failed" diff --git a/tests/test_tiers.py b/tests/test_tiers.py index bf8d3cf..11dc250 100644 --- a/tests/test_tiers.py +++ b/tests/test_tiers.py @@ -18,6 +18,7 @@ def test_byok_does_not_unlock_photo_analysis(): assert can_use("photo_analysis", tier="free", has_byok=True) is False -def test_saved_searches_require_paid(): - assert can_use("saved_searches", tier="free") is False +def test_saved_searches_are_free(): + # Ungated: retention feature — friction cost outweighs gate value (see tiers.py) + assert can_use("saved_searches", tier="free") is True assert can_use("saved_searches", tier="paid") is True diff --git a/web/src/App.vue b/web/src/App.vue index d99ff1f..abbd99e 100644 --- a/web/src/App.vue +++ b/web/src/App.vue @@ -18,17 +18,20 @@ import { useMotion } from './composables/useMotion' import { useSnipeMode } from './composables/useSnipeMode' import { useKonamiCode } from './composables/useKonamiCode' import { useSessionStore } from './stores/session' +import { useBlocklistStore } from './stores/blocklist' import AppNav from './components/AppNav.vue' const motion = useMotion() const { activate, restore } = useSnipeMode() const session = useSessionStore() +const blocklistStore = useBlocklistStore() useKonamiCode(activate) onMounted(() => { - restore() // re-apply snipe mode from localStorage on hard reload - session.bootstrap() // fetch tier + feature flags from API + restore() // re-apply snipe mode from localStorage on hard reload + session.bootstrap() // fetch tier + feature flags from API + blocklistStore.fetchBlocklist() // pre-load so card block buttons reflect state immediately }) diff --git a/web/src/assets/theme.css b/web/src/assets/theme.css index d68dc1d..1febc84 100644 --- a/web/src/assets/theme.css +++ b/web/src/assets/theme.css @@ -4,7 +4,10 @@ Snipe Mode easter egg: activated by Konami code (cf-snipe-mode in localStorage). */ -/* ── Snipe — dark tactical (default — always dark) ─ */ +/* ── Snipe — dark tactical (default) ─────────────── + Light variant is defined below via prefers-color-scheme. + Snipe Mode easter egg always overrides both. +*/ :root { /* Brand — amber target reticle */ --app-primary: #f59e0b; @@ -71,6 +74,49 @@ --sidebar-width: 220px; } +/* ── Light mode — field notebook / tactical map ───── + Warm cream surfaces with the same amber accent. + Snipe Mode data attribute overrides this via higher specificity. +*/ +@media (prefers-color-scheme: light) { + :root:not([data-snipe-mode="active"]) { + /* Surfaces — warm cream, like a tactical field notebook */ + --color-surface: #f8f5ee; + --color-surface-2: #f0ece3; + --color-surface-raised: #e8e3d8; + + /* Borders — warm khaki */ + --color-border: #c8bfae; + --color-border-light: #dbd3c4; + + /* Text — warm near-black ink */ + --color-text: #1c1a16; + --color-text-muted: #6b6357; + --color-text-inverse: #f8f5ee; + + /* Brand — amber stays identical (works great on light too) */ + --app-primary: #d97706; /* slightly deeper for contrast on light */ + --app-primary-hover: #b45309; + --app-primary-light: rgba(217, 119, 6, 0.12); + + /* Trust signals — same hues, adjusted for legibility on cream */ + --trust-high: #16a34a; + --trust-mid: #b45309; + --trust-low: #dc2626; + + /* Semantic */ + --color-success: #16a34a; + --color-error: #dc2626; + --color-warning: #b45309; + --color-info: #2563eb; + + /* Shadows — lighter, warm tint */ + --shadow-sm: 0 1px 3px rgba(60, 45, 20, 0.12), 0 1px 2px rgba(60, 45, 20, 0.08); + --shadow-md: 0 4px 12px rgba(60, 45, 20, 0.15), 0 2px 4px rgba(60, 45, 20, 0.1); + --shadow-lg: 0 10px 30px rgba(60, 45, 20, 0.2), 0 4px 8px rgba(60, 45, 20, 0.1); + } +} + /* ── Snipe Mode easter egg theme ─────────────────── */ /* Activated by Konami code; stored as 'cf-snipe-mode' in localStorage */ /* Applied: document.documentElement.dataset.snipeMode = 'active' */ diff --git a/web/src/components/AppNav.vue b/web/src/components/AppNav.vue index bc1e78d..48ef111 100644 --- a/web/src/components/AppNav.vue +++ b/web/src/components/AppNav.vue @@ -66,20 +66,23 @@ import { MagnifyingGlassIcon, BookmarkIcon, Cog6ToothIcon, + ShieldExclamationIcon, } from '@heroicons/vue/24/outline' import { useSnipeMode } from '../composables/useSnipeMode' const { active: isSnipeMode, deactivate } = useSnipeMode() const navLinks = computed(() => [ - { to: '/', icon: MagnifyingGlassIcon, label: 'Search' }, - { to: '/saved', icon: BookmarkIcon, label: 'Saved' }, + { to: '/', icon: MagnifyingGlassIcon, label: 'Search' }, + { to: '/saved', icon: BookmarkIcon, label: 'Saved' }, + { to: '/blocklist', icon: ShieldExclamationIcon, label: 'Blocklist' }, ]) const mobileLinks = [ - { to: '/', icon: MagnifyingGlassIcon, label: 'Search' }, - { to: '/saved', icon: BookmarkIcon, label: 'Saved' }, - { to: '/settings', icon: Cog6ToothIcon, label: 'Settings' }, + { to: '/', icon: MagnifyingGlassIcon, label: 'Search' }, + { to: '/saved', icon: BookmarkIcon, label: 'Saved' }, + { to: '/blocklist', icon: ShieldExclamationIcon, label: 'Block' }, + { to: '/settings', icon: Cog6ToothIcon, label: 'Settings' }, ] diff --git a/web/src/components/ListingCard.vue b/web/src/components/ListingCard.vue index b96b8cd..2dbf3e7 100644 --- a/web/src/components/ListingCard.vue +++ b/web/src/components/ListingCard.vue @@ -4,6 +4,7 @@ :class="{ 'steal-card': isSteal, 'listing-card--auction': isAuction && hoursRemaining !== null && hoursRemaining > 1, + 'listing-card--triple-red': tripleRed, }" > @@ -69,6 +70,25 @@
+ +Block {{ seller?.username }}?
+ +{{ blockError }}
+