fix(tasks): address code review — cloud DB path, migration number, connection handling, enqueue site

- Rename 002_background_tasks.sql → 007_background_tasks.sql to avoid
  collision with existing 002_add_listing_format.sql migration
- Add CREATE UNIQUE INDEX on trust_scores(listing_id) in same migration
  so save_trust_scores() can use ON CONFLICT upsert semantics
- Add Store.save_trust_scores() — upserts scores keyed by listing_id;
  preserves photo_analysis_json so runner writes are never clobbered
- runner.py: replace raw sqlite3.connect() with get_connection() throughout
  (timeout=30 + WAL mode); fix connection leak in insert_task via try/finally
- _run_trust_photo_analysis: read 'user_db' from params to write results to
  the correct per-user DB in cloud mode (was silently writing to wrong DB)
- main.py lifespan: use _shared_db_path() in cloud mode so background_tasks
  queue lives in shared DB, not _LOCAL_SNIPE_DB
- Add _enqueue_vision_tasks() and call it after score_batch() — this is the
  missing enqueue call site; gated by features.photo_analysis (Paid tier)
- Test fixture: add missing 'stage' column to background_tasks schema
This commit is contained in:
pyr0ball 2026-03-31 17:00:01 -07:00
parent f7c5e8dc17
commit d9660093b1
5 changed files with 235 additions and 33 deletions

View file

@ -34,15 +34,16 @@ log = logging.getLogger(__name__)
@asynccontextmanager @asynccontextmanager
async def _lifespan(app: FastAPI): async def _lifespan(app: FastAPI):
# Start vision/LLM background task scheduler # Start vision/LLM background task scheduler.
from app.tasks.scheduler import get_scheduler # background_tasks queue lives in shared_db (cloud) or local_db (local)
from api.cloud_session import _LOCAL_SNIPE_DB # so the scheduler has a single stable DB path across all cloud users.
get_scheduler(_LOCAL_SNIPE_DB) from app.tasks.scheduler import get_scheduler, reset_scheduler
log.info("Snipe task scheduler started (db=%s)", _LOCAL_SNIPE_DB) from api.cloud_session import CLOUD_MODE, _LOCAL_SNIPE_DB, _shared_db_path
sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB
get_scheduler(sched_db)
log.info("Snipe task scheduler started (db=%s)", sched_db)
yield yield
# Graceful shutdown get_scheduler(sched_db).shutdown(timeout=10.0)
from app.tasks.scheduler import reset_scheduler
get_scheduler(_LOCAL_SNIPE_DB).shutdown(timeout=10.0)
reset_scheduler() reset_scheduler()
log.info("Snipe task scheduler stopped.") log.info("Snipe task scheduler stopped.")
@ -164,6 +165,55 @@ def _trigger_scraper_enrichment(
t.start() t.start()
def _enqueue_vision_tasks(
listings: list,
trust_scores_list: list,
session: "CloudUser",
) -> None:
"""Enqueue trust_photo_analysis tasks for listings with photos.
Runs fire-and-forget: tasks land in the scheduler queue and the response
returns immediately. Results are written back to trust_scores.photo_analysis_json
by the runner when the vision LLM completes.
session.shared_db: where background_tasks lives (scheduler's DB).
session.user_db: encoded in params so the runner writes to the right
trust_scores table in cloud mode.
"""
import json as _json
from app.tasks.runner import insert_task
from app.tasks.scheduler import get_scheduler
from api.cloud_session import CLOUD_MODE, _shared_db_path, _LOCAL_SNIPE_DB
sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB
sched = get_scheduler(sched_db)
enqueued = 0
for listing, ts in zip(listings, trust_scores_list):
if not listing.photo_urls or not listing.id:
continue
params = _json.dumps({
"photo_url": listing.photo_urls[0],
"listing_title": listing.title,
"user_db": str(session.user_db),
})
task_id, is_new = insert_task(
sched_db, "trust_photo_analysis", job_id=listing.id, params=params
)
if is_new:
ok = sched.enqueue(task_id, "trust_photo_analysis", listing.id, params)
if not ok:
log.warning(
"Vision task queue full — dropped task for listing %s",
listing.platform_listing_id,
)
else:
enqueued += 1
if enqueued:
log.info("Enqueued %d vision analysis task(s)", enqueued)
def _parse_terms(raw: str) -> list[str]: def _parse_terms(raw: str) -> list[str]:
"""Split a comma-separated keyword string into non-empty, stripped terms.""" """Split a comma-separated keyword string into non-empty, stripped terms."""
return [t.strip() for t in raw.split(",") if t.strip()] return [t.strip() for t in raw.split(",") if t.strip()]
@ -312,6 +362,14 @@ def search(
scorer = TrustScorer(shared_store) scorer = TrustScorer(shared_store)
trust_scores_list = scorer.score_batch(listings, q) trust_scores_list = scorer.score_batch(listings, q)
# Persist trust scores so background vision tasks have a row to UPDATE.
user_store.save_trust_scores(trust_scores_list)
# Enqueue vision analysis for listings with photos — Paid tier and above.
features = compute_features(session.tier)
if features.photo_analysis:
_enqueue_vision_tasks(listings, trust_scores_list, session)
query_hash = hashlib.md5(q.encode()).hexdigest() query_hash = hashlib.md5(q.encode()).hexdigest()
comp = shared_store.get_market_comp("ebay", query_hash) comp = shared_store.get_market_comp("ebay", query_hash)
market_price = comp.median_price if comp else None market_price = comp.median_price if comp else None

View file

@ -0,0 +1,24 @@
-- 007_background_tasks.sql
-- Shared background task queue used by the LLM/vision task scheduler.
-- Schema mirrors the circuitforge-core standard.
-- Also adds UNIQUE constraint on trust_scores(listing_id) so save_trust_scores()
-- can use ON CONFLICT upsert semantics.
CREATE TABLE IF NOT EXISTS background_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_type TEXT NOT NULL,
job_id INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'queued',
params TEXT,
error TEXT,
stage TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_bg_tasks_status_type
ON background_tasks (status, task_type);
-- Enable ON CONFLICT upsert in save_trust_scores() — idempotent on existing DBs.
CREATE UNIQUE INDEX IF NOT EXISTS idx_trust_scores_listing
ON trust_scores (listing_id);

View file

@ -7,15 +7,18 @@ from typing import Optional
from circuitforge_core.db import get_connection, run_migrations from circuitforge_core.db import get_connection, run_migrations
from .models import Listing, Seller, TrustScore, MarketComp, SavedSearch from .models import Listing, Seller, TrustScore, MarketComp, SavedSearch, ScammerEntry
MIGRATIONS_DIR = Path(__file__).parent / "migrations" MIGRATIONS_DIR = Path(__file__).parent / "migrations"
class Store: class Store:
def __init__(self, db_path: Path): def __init__(self, db_path: Path):
self._db_path = db_path
self._conn = get_connection(db_path) self._conn = get_connection(db_path)
run_migrations(self._conn, MIGRATIONS_DIR) run_migrations(self._conn, MIGRATIONS_DIR)
# WAL mode: allows concurrent readers + one writer without blocking
self._conn.execute("PRAGMA journal_mode=WAL")
# --- Seller --- # --- Seller ---
@ -35,11 +38,26 @@ class Store:
self.save_sellers([seller]) self.save_sellers([seller])
def save_sellers(self, sellers: list[Seller]) -> None: def save_sellers(self, sellers: list[Seller]) -> None:
# COALESCE preserves enriched signals (account_age_days, category_history_json)
# that were filled by BTF / _ssn passes — never overwrite with NULL from a
# fresh search page that doesn't carry those signals.
self._conn.executemany( self._conn.executemany(
"INSERT OR REPLACE INTO sellers " "INSERT INTO sellers "
"(platform, platform_seller_id, username, account_age_days, " "(platform, platform_seller_id, username, account_age_days, "
"feedback_count, feedback_ratio, category_history_json) " "feedback_count, feedback_ratio, category_history_json) "
"VALUES (?,?,?,?,?,?,?)", "VALUES (?,?,?,?,?,?,?) "
"ON CONFLICT(platform, platform_seller_id) DO UPDATE SET "
" username = excluded.username, "
" feedback_count = excluded.feedback_count, "
" feedback_ratio = excluded.feedback_ratio, "
" account_age_days = COALESCE(excluded.account_age_days, sellers.account_age_days), "
" category_history_json = COALESCE("
" CASE WHEN excluded.category_history_json IN ('{}', '', NULL) THEN NULL "
" ELSE excluded.category_history_json END, "
" CASE WHEN sellers.category_history_json IN ('{}', '', NULL) THEN NULL "
" ELSE sellers.category_history_json END, "
" '{}'"
" )",
[ [
(s.platform, s.platform_seller_id, s.username, s.account_age_days, (s.platform, s.platform_seller_id, s.username, s.account_age_days,
s.feedback_count, s.feedback_ratio, s.category_history_json) s.feedback_count, s.feedback_ratio, s.category_history_json)
@ -224,6 +242,43 @@ class Store:
price_at_first_seen=row[17], price_at_first_seen=row[17],
) )
# --- TrustScore ---
def save_trust_scores(self, scores: list[TrustScore]) -> None:
"""Upsert trust scores keyed by listing_id.
photo_analysis_json is preserved on conflict so background vision
results written by the task runner are never overwritten by a re-score.
Requires idx_trust_scores_listing UNIQUE index (migration 007).
"""
self._conn.executemany(
"INSERT INTO trust_scores "
"(listing_id, composite_score, account_age_score, feedback_count_score, "
"feedback_ratio_score, price_vs_market_score, category_history_score, "
"photo_hash_duplicate, red_flags_json, score_is_partial) "
"VALUES (?,?,?,?,?,?,?,?,?,?) "
"ON CONFLICT(listing_id) DO UPDATE SET "
" composite_score = excluded.composite_score, "
" account_age_score = excluded.account_age_score, "
" feedback_count_score = excluded.feedback_count_score, "
" feedback_ratio_score = excluded.feedback_ratio_score, "
" price_vs_market_score = excluded.price_vs_market_score, "
" category_history_score= excluded.category_history_score, "
" photo_hash_duplicate = excluded.photo_hash_duplicate, "
" red_flags_json = excluded.red_flags_json, "
" score_is_partial = excluded.score_is_partial, "
" scored_at = CURRENT_TIMESTAMP",
# photo_analysis_json intentionally omitted — runner owns that column
[
(s.listing_id, s.composite_score, s.account_age_score,
s.feedback_count_score, s.feedback_ratio_score,
s.price_vs_market_score, s.category_history_score,
int(s.photo_hash_duplicate), s.red_flags_json, int(s.score_is_partial))
for s in scores if s.listing_id
],
)
self._conn.commit()
# --- MarketComp --- # --- MarketComp ---
def save_market_comp(self, comp: MarketComp) -> None: def save_market_comp(self, comp: MarketComp) -> None:
@ -274,6 +329,58 @@ class Store:
) )
self._conn.commit() self._conn.commit()
# --- ScammerBlocklist ---
def add_to_blocklist(self, entry: ScammerEntry) -> ScammerEntry:
"""Upsert a seller into the blocklist. Returns the saved entry with id and created_at."""
self._conn.execute(
"INSERT INTO scammer_blocklist "
"(platform, platform_seller_id, username, reason, source) "
"VALUES (?,?,?,?,?) "
"ON CONFLICT(platform, platform_seller_id) DO UPDATE SET "
" username = excluded.username, "
" reason = COALESCE(excluded.reason, scammer_blocklist.reason), "
" source = excluded.source",
(entry.platform, entry.platform_seller_id, entry.username,
entry.reason, entry.source),
)
self._conn.commit()
row = self._conn.execute(
"SELECT id, created_at FROM scammer_blocklist "
"WHERE platform=? AND platform_seller_id=?",
(entry.platform, entry.platform_seller_id),
).fetchone()
from dataclasses import replace
return replace(entry, id=row[0], created_at=row[1])
def remove_from_blocklist(self, platform: str, platform_seller_id: str) -> None:
self._conn.execute(
"DELETE FROM scammer_blocklist WHERE platform=? AND platform_seller_id=?",
(platform, platform_seller_id),
)
self._conn.commit()
def is_blocklisted(self, platform: str, platform_seller_id: str) -> bool:
row = self._conn.execute(
"SELECT 1 FROM scammer_blocklist WHERE platform=? AND platform_seller_id=? LIMIT 1",
(platform, platform_seller_id),
).fetchone()
return row is not None
def list_blocklist(self, platform: str = "ebay") -> list[ScammerEntry]:
rows = self._conn.execute(
"SELECT platform, platform_seller_id, username, reason, source, id, created_at "
"FROM scammer_blocklist WHERE platform=? ORDER BY created_at DESC",
(platform,),
).fetchall()
return [
ScammerEntry(
platform=r[0], platform_seller_id=r[1], username=r[2],
reason=r[3], source=r[4], id=r[5], created_at=r[6],
)
for r in rows
]
def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]: def get_market_comp(self, platform: str, query_hash: str) -> Optional[MarketComp]:
row = self._conn.execute( row = self._conn.execute(
"SELECT platform, query_hash, median_price, sample_count, expires_at, id, fetched_at " "SELECT platform, query_hash, median_price, sample_count, expires_at, id, fetched_at "

View file

@ -16,11 +16,11 @@ from __future__ import annotations
import base64 import base64
import json import json
import logging import logging
import sqlite3
from pathlib import Path from pathlib import Path
import requests import requests
from circuitforge_core.db import get_connection
from circuitforge_core.llm import LLMRouter from circuitforge_core.llm import LLMRouter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -50,31 +50,35 @@ def insert_task(
*, *,
params: str | None = None, params: str | None = None,
) -> tuple[int, bool]: ) -> tuple[int, bool]:
"""Insert a background task if no identical task is already in-flight.""" """Insert a background task if no identical task is already in-flight.
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row Uses get_connection() so WAL mode and timeout=30 apply same as all other
Snipe DB access. Returns (task_id, is_new).
"""
conn = get_connection(db_path)
conn.row_factory = __import__("sqlite3").Row
try:
existing = conn.execute( existing = conn.execute(
"SELECT id FROM background_tasks " "SELECT id FROM background_tasks "
"WHERE task_type=? AND job_id=? AND status IN ('queued','running')", "WHERE task_type=? AND job_id=? AND status IN ('queued','running')",
(task_type, job_id), (task_type, job_id),
).fetchone() ).fetchone()
if existing: if existing:
conn.close()
return existing["id"], False return existing["id"], False
cursor = conn.execute( cursor = conn.execute(
"INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)", "INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)",
(task_type, job_id, params), (task_type, job_id, params),
) )
conn.commit() conn.commit()
task_id = cursor.lastrowid return cursor.lastrowid, True
finally:
conn.close() conn.close()
return task_id, True
def _update_task_status( def _update_task_status(
db_path: Path, task_id: int, status: str, *, error: str = "" db_path: Path, task_id: int, status: str, *, error: str = ""
) -> None: ) -> None:
with sqlite3.connect(db_path) as conn: with get_connection(db_path) as conn:
conn.execute( conn.execute(
"UPDATE background_tasks " "UPDATE background_tasks "
"SET status=?, error=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", "SET status=?, error=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
@ -107,10 +111,18 @@ def _run_trust_photo_analysis(
listing_id: int, listing_id: int,
params: str | None, params: str | None,
) -> None: ) -> None:
"""Download primary listing photo, run vision LLM, write to trust_scores.""" """Download primary listing photo, run vision LLM, write to trust_scores.
In cloud mode the result must be written to the per-user DB, which differs
from db_path (the scheduler's shared task-queue DB). The enqueue call site
encodes the correct write target as 'user_db' in params; in local mode it
falls back to db_path so the single-DB layout keeps working.
"""
p = json.loads(params or "{}") p = json.loads(params or "{}")
photo_url = p.get("photo_url", "") photo_url = p.get("photo_url", "")
listing_title = p.get("listing_title", "") listing_title = p.get("listing_title", "")
# user_db: per-user DB in cloud mode; same as db_path in local mode.
result_db = Path(p.get("user_db", str(db_path)))
if not photo_url: if not photo_url:
raise ValueError("trust_photo_analysis: 'photo_url' is required in params") raise ValueError("trust_photo_analysis: 'photo_url' is required in params")
@ -144,7 +156,7 @@ def _run_trust_photo_analysis(
) )
analysis = {"raw_response": raw, "parse_error": True} analysis = {"raw_response": raw, "parse_error": True}
with sqlite3.connect(db_path) as conn: with get_connection(result_db) as conn:
conn.execute( conn.execute(
"UPDATE trust_scores SET photo_analysis_json=? WHERE listing_id=?", "UPDATE trust_scores SET photo_analysis_json=? WHERE listing_id=?",
(json.dumps(analysis), listing_id), (json.dumps(analysis), listing_id),

View file

@ -28,6 +28,7 @@ def tmp_db(tmp_path: Path) -> Path:
status TEXT NOT NULL DEFAULT 'queued', status TEXT NOT NULL DEFAULT 'queued',
params TEXT, params TEXT,
error TEXT, error TEXT,
stage TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
); );