feat(tasks): add vision task scheduler for trust_photo_analysis

Wires circuitforge_core.tasks.scheduler into Snipe. Adds trust_photo_analysis
background task: downloads primary listing photo, calls LLMRouter with vision
capability, writes result to trust_scores.photo_analysis_json (Paid tier).
photo_analysis_json column already existed in 001_init.sql migration.
This commit is contained in:
pyr0ball 2026-03-31 09:27:55 -07:00
parent f26020cf7f
commit f7c5e8dc17
7 changed files with 475 additions and 5 deletions

View file

@ -6,15 +6,19 @@ import hashlib
import logging import logging
import os import os
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
from fastapi import Depends, FastAPI, HTTPException import csv
import io
from fastapi import Depends, FastAPI, HTTPException, UploadFile, File
from fastapi.responses import StreamingResponse
from pydantic import BaseModel from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from circuitforge_core.config import load_env from circuitforge_core.config import load_env
from app.db.store import Store from app.db.store import Store
from app.db.models import SavedSearch as SavedSearchModel from app.db.models import SavedSearch as SavedSearchModel, ScammerEntry
from app.platforms import SearchFilters from app.platforms import SearchFilters
from app.platforms.ebay.scraper import ScrapedEbayAdapter from app.platforms.ebay.scraper import ScrapedEbayAdapter
from app.platforms.ebay.adapter import EbayAdapter from app.platforms.ebay.adapter import EbayAdapter
@ -28,6 +32,21 @@ load_env(Path(".env"))
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@asynccontextmanager
async def _lifespan(app: FastAPI):
# Start vision/LLM background task scheduler
from app.tasks.scheduler import get_scheduler
from api.cloud_session import _LOCAL_SNIPE_DB
get_scheduler(_LOCAL_SNIPE_DB)
log.info("Snipe task scheduler started (db=%s)", _LOCAL_SNIPE_DB)
yield
# Graceful shutdown
from app.tasks.scheduler import reset_scheduler
get_scheduler(_LOCAL_SNIPE_DB).shutdown(timeout=10.0)
reset_scheduler()
log.info("Snipe task scheduler stopped.")
def _ebay_creds() -> tuple[str, str, str]: def _ebay_creds() -> tuple[str, str, str]:
"""Return (client_id, client_secret, env) from env vars. """Return (client_id, client_secret, env) from env vars.
@ -43,7 +62,7 @@ def _ebay_creds() -> tuple[str, str, str]:
client_secret = (os.environ.get("EBAY_CERT_ID") or os.environ.get("EBAY_CLIENT_SECRET", "")).strip() client_secret = (os.environ.get("EBAY_CERT_ID") or os.environ.get("EBAY_CLIENT_SECRET", "")).strip()
return client_id, client_secret, env return client_id, client_secret, env
app = FastAPI(title="Snipe API", version="0.1.0") app = FastAPI(title="Snipe API", version="0.1.0", lifespan=_lifespan)
app.include_router(ebay_webhook_router) app.include_router(ebay_webhook_router)
app.add_middleware( app.add_middleware(
@ -111,7 +130,7 @@ def _trigger_scraper_enrichment(
seller = shared_store.get_seller("ebay", sid) seller = shared_store.get_seller("ebay", sid)
if not seller: if not seller:
continue continue
if (seller.account_age_days is None if ((seller.account_age_days is None or seller.feedback_count == 0)
and sid not in needs_btf and sid not in needs_btf
and len(needs_btf) < _BTF_MAX_PER_SEARCH): and len(needs_btf) < _BTF_MAX_PER_SEARCH):
needs_btf[sid] = listing.platform_listing_id needs_btf[sid] = listing.platform_listing_id
@ -359,7 +378,9 @@ def enrich_seller(
pass # no API creds — fall through to BTF pass # no API creds — fall through to BTF
seller_obj = shared_store.get_seller("ebay", seller) seller_obj = shared_store.get_seller("ebay", seller)
needs_btf = seller_obj is not None and seller_obj.account_age_days is None needs_btf = seller_obj is not None and (
seller_obj.account_age_days is None or seller_obj.feedback_count == 0
)
needs_categories = seller_obj is None or seller_obj.category_history_json in ("{}", "", None) needs_categories = seller_obj is None or seller_obj.category_history_json in ("{}", "", None)
# Slow path: Playwright for remaining gaps (BTF + _ssn in parallel threads). # Slow path: Playwright for remaining gaps (BTF + _ssn in parallel threads).
@ -458,3 +479,95 @@ def delete_saved_search(saved_id: int, session: CloudUser = Depends(get_session)
def mark_saved_search_run(saved_id: int, session: CloudUser = Depends(get_session)): def mark_saved_search_run(saved_id: int, session: CloudUser = Depends(get_session)):
Store(session.user_db).update_saved_search_last_run(saved_id) Store(session.user_db).update_saved_search_last_run(saved_id)
return {"ok": True} return {"ok": True}
# ── Scammer Blocklist ─────────────────────────────────────────────────────────
# Blocklist lives in shared_db: all users on a shared cloud instance see the
# same community blocklist. In local (single-user) mode shared_db == user_db.
class BlocklistAdd(BaseModel):
platform: str = "ebay"
platform_seller_id: str
username: str
reason: str = ""
@app.get("/api/blocklist")
def list_blocklist(session: CloudUser = Depends(get_session)):
store = Store(session.shared_db)
return {"entries": [dataclasses.asdict(e) for e in store.list_blocklist()]}
@app.post("/api/blocklist", status_code=201)
def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_session)):
store = Store(session.shared_db)
entry = store.add_to_blocklist(ScammerEntry(
platform=body.platform,
platform_seller_id=body.platform_seller_id,
username=body.username,
reason=body.reason or None,
source="manual",
))
return dataclasses.asdict(entry)
@app.delete("/api/blocklist/{platform_seller_id}", status_code=204)
def remove_from_blocklist(platform_seller_id: str, session: CloudUser = Depends(get_session)):
Store(session.shared_db).remove_from_blocklist("ebay", platform_seller_id)
@app.get("/api/blocklist/export")
def export_blocklist(session: CloudUser = Depends(get_session)):
"""Download the blocklist as a CSV file."""
entries = Store(session.shared_db).list_blocklist()
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["platform", "platform_seller_id", "username", "reason", "source", "created_at"])
for e in entries:
writer.writerow([e.platform, e.platform_seller_id, e.username,
e.reason or "", e.source, e.created_at or ""])
buf.seek(0)
return StreamingResponse(
iter([buf.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=snipe-blocklist.csv"},
)
@app.post("/api/blocklist/import", status_code=201)
async def import_blocklist(
file: UploadFile = File(...),
session: CloudUser = Depends(get_session),
):
"""Import a CSV blocklist. Columns: platform_seller_id, username, reason (optional)."""
content = await file.read()
try:
text = content.decode("utf-8-sig") # handle BOM from Excel exports
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File must be UTF-8 encoded")
store = Store(session.shared_db)
imported = 0
errors: list[str] = []
reader = csv.DictReader(io.StringIO(text))
# Accept both full-export format (has 'platform' col) and simple format (no 'platform' col).
for i, row in enumerate(reader, start=2):
seller_id = (row.get("platform_seller_id") or "").strip()
username = (row.get("username") or "").strip()
if not seller_id or not username:
errors.append(f"Row {i}: missing platform_seller_id or username — skipped")
continue
platform = (row.get("platform") or "ebay").strip()
reason = (row.get("reason") or "").strip() or None
store.add_to_blocklist(ScammerEntry(
platform=platform,
platform_seller_id=seller_id,
username=username,
reason=reason,
source="csv_import",
))
imported += 1
log.info("Blocklist import: %d added, %d errors", imported, len(errors))
return {"imported": imported, "errors": errors}

View file

@ -0,0 +1,18 @@
-- 002_background_tasks.sql
-- Shared background task queue used by the LLM/vision task scheduler.
-- Schema mirrors the circuitforge-core standard.
CREATE TABLE IF NOT EXISTS background_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_type TEXT NOT NULL,
job_id INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'queued',
params TEXT,
error TEXT,
stage TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_bg_tasks_status_type
ON background_tasks (status, task_type);

0
app/tasks/__init__.py Normal file
View file

159
app/tasks/runner.py Normal file
View file

@ -0,0 +1,159 @@
# app/tasks/runner.py
"""Snipe background task runner.
Implements the run_task_fn interface expected by circuitforge_core.tasks.scheduler.
Current task types:
trust_photo_analysis download primary photo, run vision LLM, write
result to trust_scores.photo_analysis_json (Paid tier).
Prompt note: The vision prompt is a functional first pass. Tune against real
eBay listings before GA specifically stock-photo vs genuine-product distinction
and the damage vocabulary.
"""
from __future__ import annotations
import base64
import json
import logging
import sqlite3
from pathlib import Path
import requests
from circuitforge_core.llm import LLMRouter
log = logging.getLogger(__name__)
LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"})
VRAM_BUDGETS: dict[str, float] = {
# moondream2 / vision-capable LLM — single image, short response
"trust_photo_analysis": 2.0,
}
_VISION_SYSTEM_PROMPT = (
"You are an expert at evaluating eBay listing photos for authenticity and condition. "
"Respond ONLY with a JSON object containing these exact keys:\n"
" is_stock_photo: bool — true if this looks like a manufacturer/marketing image\n"
" visible_damage: bool — true if scratches, dents, cracks, or defects are visible\n"
" authenticity_signal: string — one of 'genuine_product_photo', 'stock_photo', 'unclear'\n"
" confidence: string — one of 'high', 'medium', 'low'\n"
"No explanation outside the JSON object."
)
def insert_task(
db_path: Path,
task_type: str,
job_id: int,
*,
params: str | None = None,
) -> tuple[int, bool]:
"""Insert a background task if no identical task is already in-flight."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
existing = conn.execute(
"SELECT id FROM background_tasks "
"WHERE task_type=? AND job_id=? AND status IN ('queued','running')",
(task_type, job_id),
).fetchone()
if existing:
conn.close()
return existing["id"], False
cursor = conn.execute(
"INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)",
(task_type, job_id, params),
)
conn.commit()
task_id = cursor.lastrowid
conn.close()
return task_id, True
def _update_task_status(
db_path: Path, task_id: int, status: str, *, error: str = ""
) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute(
"UPDATE background_tasks "
"SET status=?, error=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(status, error, task_id),
)
def run_task(
db_path: Path,
task_id: int,
task_type: str,
job_id: int,
params: str | None = None,
) -> None:
"""Execute one background task. Called by the scheduler's batch worker."""
_update_task_status(db_path, task_id, "running")
try:
if task_type == "trust_photo_analysis":
_run_trust_photo_analysis(db_path, job_id, params)
else:
raise ValueError(f"Unknown snipe task type: {task_type!r}")
_update_task_status(db_path, task_id, "completed")
except Exception as exc:
log.exception("Task %d (%s) failed: %s", task_id, task_type, exc)
_update_task_status(db_path, task_id, "failed", error=str(exc))
def _run_trust_photo_analysis(
db_path: Path,
listing_id: int,
params: str | None,
) -> None:
"""Download primary listing photo, run vision LLM, write to trust_scores."""
p = json.loads(params or "{}")
photo_url = p.get("photo_url", "")
listing_title = p.get("listing_title", "")
if not photo_url:
raise ValueError("trust_photo_analysis: 'photo_url' is required in params")
# Download and base64-encode the photo
resp = requests.get(photo_url, timeout=10)
resp.raise_for_status()
image_b64 = base64.b64encode(resp.content).decode()
# Build user prompt with optional title context
user_prompt = "Evaluate this eBay listing photo."
if listing_title:
user_prompt = f"Evaluate this eBay listing photo for: {listing_title}"
# Call LLMRouter with vision capability
router = LLMRouter()
raw = router.complete(
user_prompt,
system=_VISION_SYSTEM_PROMPT,
images=[image_b64],
max_tokens=128,
)
# Parse — be lenient: strip markdown fences if present
try:
cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
analysis = json.loads(cleaned)
except json.JSONDecodeError:
log.warning(
"Vision LLM returned non-JSON for listing %d: %r", listing_id, raw[:200]
)
analysis = {"raw_response": raw, "parse_error": True}
with sqlite3.connect(db_path) as conn:
conn.execute(
"UPDATE trust_scores SET photo_analysis_json=? WHERE listing_id=?",
(json.dumps(analysis), listing_id),
)
log.info(
"Vision analysis for listing %d: stock=%s damage=%s confidence=%s",
listing_id,
analysis.get("is_stock_photo"),
analysis.get("visible_damage"),
analysis.get("confidence"),
)

23
app/tasks/scheduler.py Normal file
View file

@ -0,0 +1,23 @@
# app/tasks/scheduler.py
"""Snipe LLM/vision task scheduler — thin shim over circuitforge_core.tasks.scheduler."""
from __future__ import annotations
from pathlib import Path
from circuitforge_core.tasks.scheduler import (
TaskScheduler,
get_scheduler as _base_get_scheduler,
reset_scheduler, # re-export for tests
)
from app.tasks.runner import LLM_TASK_TYPES, VRAM_BUDGETS, run_task
def get_scheduler(db_path: Path) -> TaskScheduler:
"""Return the process-level TaskScheduler singleton for Snipe."""
return _base_get_scheduler(
db_path=db_path,
run_task_fn=run_task,
task_types=LLM_TASK_TYPES,
vram_budgets=VRAM_BUDGETS,
)

View file

View file

@ -0,0 +1,157 @@
"""Tests for snipe background task runner."""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from app.tasks.runner import (
LLM_TASK_TYPES,
VRAM_BUDGETS,
insert_task,
run_task,
)
@pytest.fixture
def tmp_db(tmp_path: Path) -> Path:
db = tmp_path / "snipe.db"
conn = sqlite3.connect(db)
conn.executescript("""
CREATE TABLE background_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_type TEXT NOT NULL,
job_id INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'queued',
params TEXT,
error TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE trust_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
listing_id INTEGER NOT NULL,
composite_score INTEGER NOT NULL DEFAULT 0,
photo_analysis_json TEXT,
red_flags_json TEXT NOT NULL DEFAULT '[]',
scored_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO trust_scores (listing_id, composite_score) VALUES (1, 72);
""")
conn.commit()
conn.close()
return db
def test_llm_task_types_defined():
assert "trust_photo_analysis" in LLM_TASK_TYPES
def test_vram_budgets_defined():
assert "trust_photo_analysis" in VRAM_BUDGETS
assert VRAM_BUDGETS["trust_photo_analysis"] > 0
def test_insert_task_creates_row(tmp_db: Path):
task_id, is_new = insert_task(tmp_db, "trust_photo_analysis", job_id=1)
assert is_new is True
conn = sqlite3.connect(tmp_db)
row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert row[0] == "queued"
def test_insert_task_dedup(tmp_db: Path):
id1, new1 = insert_task(tmp_db, "trust_photo_analysis", job_id=1)
id2, new2 = insert_task(tmp_db, "trust_photo_analysis", job_id=1)
assert id1 == id2
assert new1 is True
assert new2 is False
def test_run_task_photo_analysis_success(tmp_db: Path):
"""Vision analysis result is written to trust_scores.photo_analysis_json."""
params = json.dumps({
"listing_id": 1,
"photo_url": "https://example.com/photo.jpg",
"listing_title": "Used iPhone 13",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
vision_result = {
"is_stock_photo": False,
"visible_damage": False,
"authenticity_signal": "genuine_product_photo",
"confidence": "high",
}
with patch("app.tasks.runner.requests") as mock_req, \
patch("app.tasks.runner.LLMRouter") as MockRouter:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
instance = MockRouter.return_value
instance.complete.return_value = json.dumps(vision_result)
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db)
score_row = conn.execute(
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
).fetchone()
task_row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert task_row[0] == "completed"
parsed = json.loads(score_row[0])
assert parsed["is_stock_photo"] is False
def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path):
"""If photo download fails, task is marked failed without crashing."""
params = json.dumps({
"listing_id": 1,
"photo_url": "https://example.com/bad.jpg",
"listing_title": "Laptop",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
with patch("app.tasks.runner.requests") as mock_req:
mock_req.get.side_effect = ConnectionError("fetch failed")
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db)
row = conn.execute(
"SELECT status, error FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert row[0] == "failed"
assert "fetch failed" in row[1]
def test_run_task_no_photo_url_marks_failed(tmp_db: Path):
params = json.dumps({"listing_id": 1})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db)
row = conn.execute(
"SELECT status, error FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert row[0] == "failed"
assert "photo_url" in row[1]
def test_run_task_unknown_type_marks_failed(tmp_db: Path):
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1)
run_task(tmp_db, task_id, "unknown_type", 1, None)
conn = sqlite3.connect(tmp_db)
row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert row[0] == "failed"