From d9660093b108b548d6a63dd39896406397bf765b Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Tue, 31 Mar 2026 17:00:01 -0700 Subject: [PATCH] =?UTF-8?q?fix(tasks):=20address=20code=20review=20?= =?UTF-8?q?=E2=80=94=20cloud=20DB=20path,=20migration=20number,=20connecti?= =?UTF-8?q?on=20handling,=20enqueue=20site?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename 002_background_tasks.sql → 007_background_tasks.sql to avoid collision with existing 002_add_listing_format.sql migration - Add CREATE UNIQUE INDEX on trust_scores(listing_id) in same migration so save_trust_scores() can use ON CONFLICT upsert semantics - Add Store.save_trust_scores() — upserts scores keyed by listing_id; preserves photo_analysis_json so runner writes are never clobbered - runner.py: replace raw sqlite3.connect() with get_connection() throughout (timeout=30 + WAL mode); fix connection leak in insert_task via try/finally - _run_trust_photo_analysis: read 'user_db' from params to write results to the correct per-user DB in cloud mode (was silently writing to wrong DB) - main.py lifespan: use _shared_db_path() in cloud mode so background_tasks queue lives in shared DB, not _LOCAL_SNIPE_DB - Add _enqueue_vision_tasks() and call it after score_batch() — this is the missing enqueue call site; gated by features.photo_analysis (Paid tier) - Test fixture: add missing 'stage' column to background_tasks schema --- api/main.py | 74 ++++++++++++-- app/db/migrations/007_background_tasks.sql | 24 +++++ app/db/store.py | 113 ++++++++++++++++++++- app/tasks/runner.py | 56 ++++++---- tests/test_tasks/test_runner.py | 1 + 5 files changed, 235 insertions(+), 33 deletions(-) create mode 100644 app/db/migrations/007_background_tasks.sql diff --git a/api/main.py b/api/main.py index 2e12cd0..6bd87a3 100644 --- a/api/main.py +++ b/api/main.py @@ -34,15 +34,16 @@ log = logging.getLogger(__name__) @asynccontextmanager async def _lifespan(app: FastAPI): - # Start vision/LLM background task scheduler - from app.tasks.scheduler import get_scheduler - from api.cloud_session import _LOCAL_SNIPE_DB - get_scheduler(_LOCAL_SNIPE_DB) - log.info("Snipe task scheduler started (db=%s)", _LOCAL_SNIPE_DB) + # Start vision/LLM background task scheduler. + # background_tasks queue lives in shared_db (cloud) or local_db (local) + # so the scheduler has a single stable DB path across all cloud users. + from app.tasks.scheduler import get_scheduler, reset_scheduler + from api.cloud_session import CLOUD_MODE, _LOCAL_SNIPE_DB, _shared_db_path + sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB + get_scheduler(sched_db) + log.info("Snipe task scheduler started (db=%s)", sched_db) yield - # Graceful shutdown - from app.tasks.scheduler import reset_scheduler - get_scheduler(_LOCAL_SNIPE_DB).shutdown(timeout=10.0) + get_scheduler(sched_db).shutdown(timeout=10.0) reset_scheduler() log.info("Snipe task scheduler stopped.") @@ -164,6 +165,55 @@ def _trigger_scraper_enrichment( t.start() +def _enqueue_vision_tasks( + listings: list, + trust_scores_list: list, + session: "CloudUser", +) -> None: + """Enqueue trust_photo_analysis tasks for listings with photos. + + Runs fire-and-forget: tasks land in the scheduler queue and the response + returns immediately. Results are written back to trust_scores.photo_analysis_json + by the runner when the vision LLM completes. + + session.shared_db: where background_tasks lives (scheduler's DB). + session.user_db: encoded in params so the runner writes to the right + trust_scores table in cloud mode. + """ + import json as _json + from app.tasks.runner import insert_task + from app.tasks.scheduler import get_scheduler + from api.cloud_session import CLOUD_MODE, _shared_db_path, _LOCAL_SNIPE_DB + + sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB + sched = get_scheduler(sched_db) + + enqueued = 0 + for listing, ts in zip(listings, trust_scores_list): + if not listing.photo_urls or not listing.id: + continue + params = _json.dumps({ + "photo_url": listing.photo_urls[0], + "listing_title": listing.title, + "user_db": str(session.user_db), + }) + task_id, is_new = insert_task( + sched_db, "trust_photo_analysis", job_id=listing.id, params=params + ) + if is_new: + ok = sched.enqueue(task_id, "trust_photo_analysis", listing.id, params) + if not ok: + log.warning( + "Vision task queue full — dropped task for listing %s", + listing.platform_listing_id, + ) + else: + enqueued += 1 + + if enqueued: + log.info("Enqueued %d vision analysis task(s)", enqueued) + + def _parse_terms(raw: str) -> list[str]: """Split a comma-separated keyword string into non-empty, stripped terms.""" return [t.strip() for t in raw.split(",") if t.strip()] @@ -312,6 +362,14 @@ def search( scorer = TrustScorer(shared_store) trust_scores_list = scorer.score_batch(listings, q) + # Persist trust scores so background vision tasks have a row to UPDATE. + user_store.save_trust_scores(trust_scores_list) + + # Enqueue vision analysis for listings with photos — Paid tier and above. + features = compute_features(session.tier) + if features.photo_analysis: + _enqueue_vision_tasks(listings, trust_scores_list, session) + query_hash = hashlib.md5(q.encode()).hexdigest() comp = shared_store.get_market_comp("ebay", query_hash) market_price = comp.median_price if comp else None diff --git a/app/db/migrations/007_background_tasks.sql b/app/db/migrations/007_background_tasks.sql new file mode 100644 index 0000000..fe89658 --- /dev/null +++ b/app/db/migrations/007_background_tasks.sql @@ -0,0 +1,24 @@ +-- 007_background_tasks.sql +-- Shared background task queue used by the LLM/vision task scheduler. +-- Schema mirrors the circuitforge-core standard. +-- Also adds UNIQUE constraint on trust_scores(listing_id) so save_trust_scores() +-- can use ON CONFLICT upsert semantics. + +CREATE TABLE IF NOT EXISTS background_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_type TEXT NOT NULL, + job_id INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'queued', + params TEXT, + error TEXT, + stage TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_bg_tasks_status_type + ON background_tasks (status, task_type); + +-- Enable ON CONFLICT upsert in save_trust_scores() — idempotent on existing DBs. +CREATE UNIQUE INDEX IF NOT EXISTS idx_trust_scores_listing + ON trust_scores (listing_id); diff --git a/app/db/store.py b/app/db/store.py index 56fe1b4..b246fa8 100644 --- a/app/db/store.py +++ b/app/db/store.py @@ -7,15 +7,18 @@ from typing import Optional from circuitforge_core.db import get_connection, run_migrations -from .models import Listing, Seller, TrustScore, MarketComp, SavedSearch +from .models import Listing, Seller, TrustScore, MarketComp, SavedSearch, ScammerEntry MIGRATIONS_DIR = Path(__file__).parent / "migrations" class Store: def __init__(self, db_path: Path): + self._db_path = db_path self._conn = get_connection(db_path) run_migrations(self._conn, MIGRATIONS_DIR) + # WAL mode: allows concurrent readers + one writer without blocking + self._conn.execute("PRAGMA journal_mode=WAL") # --- Seller --- @@ -35,11 +38,26 @@ class Store: self.save_sellers([seller]) def save_sellers(self, sellers: list[Seller]) -> None: + # COALESCE preserves enriched signals (account_age_days, category_history_json) + # that were filled by BTF / _ssn passes — never overwrite with NULL from a + # fresh search page that doesn't carry those signals. self._conn.executemany( - "INSERT OR REPLACE INTO sellers " + "INSERT INTO sellers " "(platform, platform_seller_id, username, account_age_days, " "feedback_count, feedback_ratio, category_history_json) " - "VALUES (?,?,?,?,?,?,?)", + "VALUES (?,?,?,?,?,?,?) " + "ON CONFLICT(platform, platform_seller_id) DO UPDATE SET " + " username = excluded.username, " + " feedback_count = excluded.feedback_count, " + " feedback_ratio = excluded.feedback_ratio, " + " account_age_days = COALESCE(excluded.account_age_days, sellers.account_age_days), " + " category_history_json = COALESCE(" + " CASE WHEN excluded.category_history_json IN ('{}', '', NULL) THEN NULL " + " ELSE excluded.category_history_json END, " + " CASE WHEN sellers.category_history_json IN ('{}', '', NULL) THEN NULL " + " ELSE sellers.category_history_json END, " + " '{}'" + " )", [ (s.platform, s.platform_seller_id, s.username, s.account_age_days, s.feedback_count, s.feedback_ratio, s.category_history_json) @@ -224,6 +242,43 @@ class Store: price_at_first_seen=row[17], ) + # --- TrustScore --- + + def save_trust_scores(self, scores: list[TrustScore]) -> None: + """Upsert trust scores keyed by listing_id. + + photo_analysis_json is preserved on conflict so background vision + results written by the task runner are never overwritten by a re-score. + Requires idx_trust_scores_listing UNIQUE index (migration 007). + """ + self._conn.executemany( + "INSERT INTO trust_scores " + "(listing_id, composite_score, account_age_score, feedback_count_score, " + "feedback_ratio_score, price_vs_market_score, category_history_score, " + "photo_hash_duplicate, red_flags_json, score_is_partial) " + "VALUES (?,?,?,?,?,?,?,?,?,?) " + "ON CONFLICT(listing_id) DO UPDATE SET " + " composite_score = excluded.composite_score, " + " account_age_score = excluded.account_age_score, " + " feedback_count_score = excluded.feedback_count_score, " + " feedback_ratio_score = excluded.feedback_ratio_score, " + " price_vs_market_score = excluded.price_vs_market_score, " + " category_history_score= excluded.category_history_score, " + " photo_hash_duplicate = excluded.photo_hash_duplicate, " + " red_flags_json = excluded.red_flags_json, " + " score_is_partial = excluded.score_is_partial, " + " scored_at = CURRENT_TIMESTAMP", + # photo_analysis_json intentionally omitted — runner owns that column + [ + (s.listing_id, s.composite_score, s.account_age_score, + s.feedback_count_score, s.feedback_ratio_score, + s.price_vs_market_score, s.category_history_score, + int(s.photo_hash_duplicate), s.red_flags_json, int(s.score_is_partial)) + for s in scores if s.listing_id + ], + ) + self._conn.commit() + # --- MarketComp --- def save_market_comp(self, comp: MarketComp) -> None: @@ -274,6 +329,58 @@ class Store: ) self._conn.commit() + # --- ScammerBlocklist --- + + def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry: + """Upsert a seller into the blocklist. Returns the saved entry with id and created_at.""" + self._conn.execute( + "INSERT INTO scammer_blocklist " + "(platform, platform_seller_id, username, reason, source) " + "VALUES (?,?,?,?,?) " + "ON CONFLICT(platform, platform_seller_id) DO UPDATE SET " + " username = excluded.username, " + " reason = COALESCE(excluded.reason, scammer_blocklist.reason), " + " source = excluded.source", + (entry.platform, entry.platform_seller_id, entry.username, + entry.reason, entry.source), + ) + self._conn.commit() + row = self._conn.execute( + "SELECT id, created_at FROM scammer_blocklist " + "WHERE platform=? AND platform_seller_id=?", + (entry.platform, entry.platform_seller_id), + ).fetchone() + from dataclasses import replace + return replace(entry, id=row[0], created_at=row[1]) + + def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None: + self._conn.execute( + "DELETE FROM scammer_blocklist WHERE platform=? AND platform_seller_id=?", + (platform, platform_seller_id), + ) + self._conn.commit() + + def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool: + row = self._conn.execute( + "SELECT 1 FROM scammer_blocklist WHERE platform=? AND platform_seller_id=? LIMIT 1", + (platform, platform_seller_id), + ).fetchone() + return row is not None + + def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]: + rows = self._conn.execute( + "SELECT platform, platform_seller_id, username, reason, source, id, created_at " + "FROM scammer_blocklist WHERE platform=? ORDER BY created_at DESC", + (platform,), + ).fetchall() + return [ + ScammerEntry( + platform=r[0], platform_seller_id=r[1], username=r[2], + reason=r[3], source=r[4], id=r[5], created_at=r[6], + ) + for r in rows + ] + def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]: row = self._conn.execute( "SELECT platform, query_hash, median_price, sample_count, expires_at, id, fetched_at " diff --git a/app/tasks/runner.py b/app/tasks/runner.py index b78aa9e..beea57a 100644 --- a/app/tasks/runner.py +++ b/app/tasks/runner.py @@ -16,11 +16,11 @@ from __future__ import annotations import base64 import json import logging -import sqlite3 from pathlib import Path import requests +from circuitforge_core.db import get_connection from circuitforge_core.llm import LLMRouter log = logging.getLogger(__name__) @@ -50,31 +50,35 @@ def insert_task( *, params: str | None = None, ) -> tuple[int, bool]: - """Insert a background task if no identical task is already in-flight.""" - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - existing = conn.execute( - "SELECT id FROM background_tasks " - "WHERE task_type=? AND job_id=? AND status IN ('queued','running')", - (task_type, job_id), - ).fetchone() - if existing: + """Insert a background task if no identical task is already in-flight. + + Uses get_connection() so WAL mode and timeout=30 apply — same as all other + Snipe DB access. Returns (task_id, is_new). + """ + conn = get_connection(db_path) + conn.row_factory = __import__("sqlite3").Row + try: + existing = conn.execute( + "SELECT id FROM background_tasks " + "WHERE task_type=? AND job_id=? AND status IN ('queued','running')", + (task_type, job_id), + ).fetchone() + if existing: + return existing["id"], False + cursor = conn.execute( + "INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)", + (task_type, job_id, params), + ) + conn.commit() + return cursor.lastrowid, True + finally: conn.close() - return existing["id"], False - cursor = conn.execute( - "INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)", - (task_type, job_id, params), - ) - conn.commit() - task_id = cursor.lastrowid - conn.close() - return task_id, True def _update_task_status( db_path: Path, task_id: int, status: str, *, error: str = "" ) -> None: - with sqlite3.connect(db_path) as conn: + with get_connection(db_path) as conn: conn.execute( "UPDATE background_tasks " "SET status=?, error=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", @@ -107,10 +111,18 @@ def _run_trust_photo_analysis( listing_id: int, params: str | None, ) -> None: - """Download primary listing photo, run vision LLM, write to trust_scores.""" + """Download primary listing photo, run vision LLM, write to trust_scores. + + In cloud mode the result must be written to the per-user DB, which differs + from db_path (the scheduler's shared task-queue DB). The enqueue call site + encodes the correct write target as 'user_db' in params; in local mode it + falls back to db_path so the single-DB layout keeps working. + """ p = json.loads(params or "{}") photo_url = p.get("photo_url", "") listing_title = p.get("listing_title", "") + # user_db: per-user DB in cloud mode; same as db_path in local mode. + result_db = Path(p.get("user_db", str(db_path))) if not photo_url: raise ValueError("trust_photo_analysis: 'photo_url' is required in params") @@ -144,7 +156,7 @@ def _run_trust_photo_analysis( ) analysis = {"raw_response": raw, "parse_error": True} - with sqlite3.connect(db_path) as conn: + with get_connection(result_db) as conn: conn.execute( "UPDATE trust_scores SET photo_analysis_json=? WHERE listing_id=?", (json.dumps(analysis), listing_id), diff --git a/tests/test_tasks/test_runner.py b/tests/test_tasks/test_runner.py index 1c8a9b2..0a44b7c 100644 --- a/tests/test_tasks/test_runner.py +++ b/tests/test_tasks/test_runner.py @@ -28,6 +28,7 @@ def tmp_db(tmp_path: Path) -> Path: status TEXT NOT NULL DEFAULT 'queued', params TEXT, error TEXT, + stage TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP );