diff --git a/api/main.py b/api/main.py index ca9ddef..2e12cd0 100644 --- a/api/main.py +++ b/api/main.py @@ -6,15 +6,19 @@ import hashlib import logging import os from concurrent.futures import ThreadPoolExecutor +from contextlib import asynccontextmanager from pathlib import Path -from fastapi import Depends, FastAPI, HTTPException +import csv +import io +from fastapi import Depends, FastAPI, HTTPException, UploadFile, File +from fastapi.responses import StreamingResponse from pydantic import BaseModel from fastapi.middleware.cors import CORSMiddleware from circuitforge_core.config import load_env from app.db.store import Store -from app.db.models import SavedSearch as SavedSearchModel +from app.db.models import SavedSearch as SavedSearchModel, ScammerEntry from app.platforms import SearchFilters from app.platforms.ebay.scraper import ScrapedEbayAdapter from app.platforms.ebay.adapter import EbayAdapter @@ -28,6 +32,21 @@ load_env(Path(".env")) log = logging.getLogger(__name__) +@asynccontextmanager +async def _lifespan(app: FastAPI): + # Start vision/LLM background task scheduler + from app.tasks.scheduler import get_scheduler + from api.cloud_session import _LOCAL_SNIPE_DB + get_scheduler(_LOCAL_SNIPE_DB) + log.info("Snipe task scheduler started (db=%s)", _LOCAL_SNIPE_DB) + yield + # Graceful shutdown + from app.tasks.scheduler import reset_scheduler + get_scheduler(_LOCAL_SNIPE_DB).shutdown(timeout=10.0) + reset_scheduler() + log.info("Snipe task scheduler stopped.") + + def _ebay_creds() -> tuple[str, str, str]: """Return (client_id, client_secret, env) from env vars. @@ -43,7 +62,7 @@ def _ebay_creds() -> tuple[str, str, str]: client_secret = (os.environ.get("EBAY_CERT_ID") or os.environ.get("EBAY_CLIENT_SECRET", "")).strip() return client_id, client_secret, env -app = FastAPI(title="Snipe API", version="0.1.0") +app = FastAPI(title="Snipe API", version="0.1.0", lifespan=_lifespan) app.include_router(ebay_webhook_router) app.add_middleware( @@ -111,7 +130,7 @@ def _trigger_scraper_enrichment( seller = shared_store.get_seller("ebay", sid) if not seller: continue - if (seller.account_age_days is None + if ((seller.account_age_days is None or seller.feedback_count == 0) and sid not in needs_btf and len(needs_btf) < _BTF_MAX_PER_SEARCH): needs_btf[sid] = listing.platform_listing_id @@ -359,7 +378,9 @@ def enrich_seller( pass # no API creds — fall through to BTF seller_obj = shared_store.get_seller("ebay", seller) - needs_btf = seller_obj is not None and seller_obj.account_age_days is None + needs_btf = seller_obj is not None and ( + seller_obj.account_age_days is None or seller_obj.feedback_count == 0 + ) needs_categories = seller_obj is None or seller_obj.category_history_json in ("{}", "", None) # Slow path: Playwright for remaining gaps (BTF + _ssn in parallel threads). @@ -458,3 +479,95 @@ def delete_saved_search(saved_id: int, session: CloudUser = Depends(get_session) def mark_saved_search_run(saved_id: int, session: CloudUser = Depends(get_session)): Store(session.user_db).update_saved_search_last_run(saved_id) return {"ok": True} + + +# ── Scammer Blocklist ───────────────────────────────────────────────────────── +# Blocklist lives in shared_db: all users on a shared cloud instance see the +# same community blocklist. In local (single-user) mode shared_db == user_db. + +class BlocklistAdd(BaseModel): + platform: str = "ebay" + platform_seller_id: str + username: str + reason: str = "" + + +@app.get("/api/blocklist") +def list_blocklist(session: CloudUser = Depends(get_session)): + store = Store(session.shared_db) + return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]} + + +@app.post("/api/blocklist", status_code=201) +def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_session)): + store = Store(session.shared_db) + entry = store.add_to_blocklist(ScammerEntry( + platform=body.platform, + platform_seller_id=body.platform_seller_id, + username=body.username, + reason=body.reason or None, + source="manual", + )) + return dataclasses.asdict(entry) + + +@app.delete("/api/blocklist/{platform_seller_id}", status_code=204) +def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)): + Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id) + + +@app.get("/api/blocklist/export") +def export_blocklist(session: CloudUser = Depends(get_session)): + """Download the blocklist as a CSV file.""" + entries = Store(session.shared_db).list_blocklist() + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"]) + for e in entries: + writer.writerow([e.platform, e.platform_seller_id, e.username, + e.reason or "", e.source, e.created_at or ""]) + buf.seek(0) + return StreamingResponse( + iter([buf.getvalue()]), + media_type="text/csv", + headers={"Content-Disposition": "attachment; filename=snipe-blocklist.csv"}, + ) + + +@app.post("/api/blocklist/import", status_code=201) +async def import_blocklist( + file: UploadFile = File(...), + session: CloudUser = Depends(get_session), +): + """Import a CSV blocklist. Columns: platform_seller_id, username, reason (optional).""" + content = await file.read() + try: + text = content.decode("utf-8-sig") # handle BOM from Excel exports + except UnicodeDecodeError: + raise HTTPException(status_code=400, detail="File must be UTF-8 encoded") + + store = Store(session.shared_db) + imported = 0 + errors: list[str] = [] + reader = csv.DictReader(io.StringIO(text)) + + # Accept both full-export format (has 'platform' col) and simple format (no 'platform' col). + for i, row in enumerate(reader, start=2): + seller_id = (row.get("platform_seller_id") or "").strip() + username = (row.get("username") or "").strip() + if not seller_id or not username: + errors.append(f"Row {i}: missing platform_seller_id or username — skipped") + continue + platform = (row.get("platform") or "ebay").strip() + reason = (row.get("reason") or "").strip() or None + store.add_to_blocklist(ScammerEntry( + platform=platform, + platform_seller_id=seller_id, + username=username, + reason=reason, + source="csv_import", + )) + imported += 1 + + log.info("Blocklist import: %d added, %d errors", imported, len(errors)) + return {"imported": imported, "errors": errors} diff --git a/app/db/migrations/002_background_tasks.sql b/app/db/migrations/002_background_tasks.sql new file mode 100644 index 0000000..063e104 --- /dev/null +++ b/app/db/migrations/002_background_tasks.sql @@ -0,0 +1,18 @@ +-- 002_background_tasks.sql +-- Shared background task queue used by the LLM/vision task scheduler. +-- Schema mirrors the circuitforge-core standard. + +CREATE TABLE IF NOT EXISTS background_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_type TEXT NOT NULL, + job_id INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'queued', + params TEXT, + error TEXT, + stage TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_bg_tasks_status_type + ON background_tasks (status, task_type); diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tasks/runner.py b/app/tasks/runner.py new file mode 100644 index 0000000..b78aa9e --- /dev/null +++ b/app/tasks/runner.py @@ -0,0 +1,159 @@ +# app/tasks/runner.py +"""Snipe background task runner. + +Implements the run_task_fn interface expected by circuitforge_core.tasks.scheduler. + +Current task types: + trust_photo_analysis — download primary photo, run vision LLM, write + result to trust_scores.photo_analysis_json (Paid tier). + +Prompt note: The vision prompt is a functional first pass. Tune against real +eBay listings before GA — specifically stock-photo vs genuine-product distinction +and the damage vocabulary. +""" +from __future__ import annotations + +import base64 +import json +import logging +import sqlite3 +from pathlib import Path + +import requests + +from circuitforge_core.llm import LLMRouter + +log = logging.getLogger(__name__) + +LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"}) + +VRAM_BUDGETS: dict[str, float] = { + # moondream2 / vision-capable LLM — single image, short response + "trust_photo_analysis": 2.0, +} + +_VISION_SYSTEM_PROMPT = ( + "You are an expert at evaluating eBay listing photos for authenticity and condition. " + "Respond ONLY with a JSON object containing these exact keys:\n" + " is_stock_photo: bool — true if this looks like a manufacturer/marketing image\n" + " visible_damage: bool — true if scratches, dents, cracks, or defects are visible\n" + " authenticity_signal: string — one of 'genuine_product_photo', 'stock_photo', 'unclear'\n" + " confidence: string — one of 'high', 'medium', 'low'\n" + "No explanation outside the JSON object." +) + + +def insert_task( + db_path: Path, + task_type: str, + job_id: int, + *, + params: str | None = None, +) -> tuple[int, bool]: + """Insert a background task if no identical task is already in-flight.""" + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + existing = conn.execute( + "SELECT id FROM background_tasks " + "WHERE task_type=? AND job_id=? AND status IN ('queued','running')", + (task_type, job_id), + ).fetchone() + if existing: + conn.close() + return existing["id"], False + cursor = conn.execute( + "INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)", + (task_type, job_id, params), + ) + conn.commit() + task_id = cursor.lastrowid + conn.close() + return task_id, True + + +def _update_task_status( + db_path: Path, task_id: int, status: str, *, error: str = "" +) -> None: + with sqlite3.connect(db_path) as conn: + conn.execute( + "UPDATE background_tasks " + "SET status=?, error=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", + (status, error, task_id), + ) + + +def run_task( + db_path: Path, + task_id: int, + task_type: str, + job_id: int, + params: str | None = None, +) -> None: + """Execute one background task. Called by the scheduler's batch worker.""" + _update_task_status(db_path, task_id, "running") + try: + if task_type == "trust_photo_analysis": + _run_trust_photo_analysis(db_path, job_id, params) + else: + raise ValueError(f"Unknown snipe task type: {task_type!r}") + _update_task_status(db_path, task_id, "completed") + except Exception as exc: + log.exception("Task %d (%s) failed: %s", task_id, task_type, exc) + _update_task_status(db_path, task_id, "failed", error=str(exc)) + + +def _run_trust_photo_analysis( + db_path: Path, + listing_id: int, + params: str | None, +) -> None: + """Download primary listing photo, run vision LLM, write to trust_scores.""" + p = json.loads(params or "{}") + photo_url = p.get("photo_url", "") + listing_title = p.get("listing_title", "") + + if not photo_url: + raise ValueError("trust_photo_analysis: 'photo_url' is required in params") + + # Download and base64-encode the photo + resp = requests.get(photo_url, timeout=10) + resp.raise_for_status() + image_b64 = base64.b64encode(resp.content).decode() + + # Build user prompt with optional title context + user_prompt = "Evaluate this eBay listing photo." + if listing_title: + user_prompt = f"Evaluate this eBay listing photo for: {listing_title}" + + # Call LLMRouter with vision capability + router = LLMRouter() + raw = router.complete( + user_prompt, + system=_VISION_SYSTEM_PROMPT, + images=[image_b64], + max_tokens=128, + ) + + # Parse — be lenient: strip markdown fences if present + try: + cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() + analysis = json.loads(cleaned) + except json.JSONDecodeError: + log.warning( + "Vision LLM returned non-JSON for listing %d: %r", listing_id, raw[:200] + ) + analysis = {"raw_response": raw, "parse_error": True} + + with sqlite3.connect(db_path) as conn: + conn.execute( + "UPDATE trust_scores SET photo_analysis_json=? WHERE listing_id=?", + (json.dumps(analysis), listing_id), + ) + + log.info( + "Vision analysis for listing %d: stock=%s damage=%s confidence=%s", + listing_id, + analysis.get("is_stock_photo"), + analysis.get("visible_damage"), + analysis.get("confidence"), + ) diff --git a/app/tasks/scheduler.py b/app/tasks/scheduler.py new file mode 100644 index 0000000..a45e0ae --- /dev/null +++ b/app/tasks/scheduler.py @@ -0,0 +1,23 @@ +# app/tasks/scheduler.py +"""Snipe LLM/vision task scheduler — thin shim over circuitforge_core.tasks.scheduler.""" +from __future__ import annotations + +from pathlib import Path + +from circuitforge_core.tasks.scheduler import ( + TaskScheduler, + get_scheduler as _base_get_scheduler, + reset_scheduler, # re-export for tests +) + +from app.tasks.runner import LLM_TASK_TYPES, VRAM_BUDGETS, run_task + + +def get_scheduler(db_path: Path) -> TaskScheduler: + """Return the process-level TaskScheduler singleton for Snipe.""" + return _base_get_scheduler( + db_path=db_path, + run_task_fn=run_task, + task_types=LLM_TASK_TYPES, + vram_budgets=VRAM_BUDGETS, + ) diff --git a/tests/test_tasks/__init__.py b/tests/test_tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_tasks/test_runner.py b/tests/test_tasks/test_runner.py new file mode 100644 index 0000000..1c8a9b2 --- /dev/null +++ b/tests/test_tasks/test_runner.py @@ -0,0 +1,157 @@ +"""Tests for snipe background task runner.""" +from __future__ import annotations + +import json +import sqlite3 +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from app.tasks.runner import ( + LLM_TASK_TYPES, + VRAM_BUDGETS, + insert_task, + run_task, +) + + +@pytest.fixture +def tmp_db(tmp_path: Path) -> Path: + db = tmp_path / "snipe.db" + conn = sqlite3.connect(db) + conn.executescript(""" + CREATE TABLE background_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_type TEXT NOT NULL, + job_id INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'queued', + params TEXT, + error TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE trust_scores ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + listing_id INTEGER NOT NULL, + composite_score INTEGER NOT NULL DEFAULT 0, + photo_analysis_json TEXT, + red_flags_json TEXT NOT NULL DEFAULT '[]', + scored_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + INSERT INTO trust_scores (listing_id, composite_score) VALUES (1, 72); + """) + conn.commit() + conn.close() + return db + + +def test_llm_task_types_defined(): + assert "trust_photo_analysis" in LLM_TASK_TYPES + + +def test_vram_budgets_defined(): + assert "trust_photo_analysis" in VRAM_BUDGETS + assert VRAM_BUDGETS["trust_photo_analysis"] > 0 + + +def test_insert_task_creates_row(tmp_db: Path): + task_id, is_new = insert_task(tmp_db, "trust_photo_analysis", job_id=1) + assert is_new is True + conn = sqlite3.connect(tmp_db) + row = conn.execute( + "SELECT status FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert row[0] == "queued" + + +def test_insert_task_dedup(tmp_db: Path): + id1, new1 = insert_task(tmp_db, "trust_photo_analysis", job_id=1) + id2, new2 = insert_task(tmp_db, "trust_photo_analysis", job_id=1) + assert id1 == id2 + assert new1 is True + assert new2 is False + + +def test_run_task_photo_analysis_success(tmp_db: Path): + """Vision analysis result is written to trust_scores.photo_analysis_json.""" + params = json.dumps({ + "listing_id": 1, + "photo_url": "https://example.com/photo.jpg", + "listing_title": "Used iPhone 13", + }) + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params) + + vision_result = { + "is_stock_photo": False, + "visible_damage": False, + "authenticity_signal": "genuine_product_photo", + "confidence": "high", + } + + with patch("app.tasks.runner.requests") as mock_req, \ + patch("app.tasks.runner.LLMRouter") as MockRouter: + mock_req.get.return_value.content = b"fake_image_bytes" + mock_req.get.return_value.raise_for_status = lambda: None + instance = MockRouter.return_value + instance.complete.return_value = json.dumps(vision_result) + run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) + + conn = sqlite3.connect(tmp_db) + score_row = conn.execute( + "SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1" + ).fetchone() + task_row = conn.execute( + "SELECT status FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert task_row[0] == "completed" + parsed = json.loads(score_row[0]) + assert parsed["is_stock_photo"] is False + + +def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path): + """If photo download fails, task is marked failed without crashing.""" + params = json.dumps({ + "listing_id": 1, + "photo_url": "https://example.com/bad.jpg", + "listing_title": "Laptop", + }) + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params) + + with patch("app.tasks.runner.requests") as mock_req: + mock_req.get.side_effect = ConnectionError("fetch failed") + run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) + + conn = sqlite3.connect(tmp_db) + row = conn.execute( + "SELECT status, error FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert row[0] == "failed" + assert "fetch failed" in row[1] + + +def test_run_task_no_photo_url_marks_failed(tmp_db: Path): + params = json.dumps({"listing_id": 1}) + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params) + run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) + conn = sqlite3.connect(tmp_db) + row = conn.execute( + "SELECT status, error FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert row[0] == "failed" + assert "photo_url" in row[1] + + +def test_run_task_unknown_type_marks_failed(tmp_db: Path): + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1) + run_task(tmp_db, task_id, "unknown_type", 1, None) + conn = sqlite3.connect(tmp_db) + row = conn.execute( + "SELECT status FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert row[0] == "failed"