From 3a4b33d5dd80a4ba6d1e9b97e31955f6b435cc98 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Tue, 14 Apr 2026 08:33:00 -0700 Subject: [PATCH] feat: wire community module + corrections router (#31 #32 #33) Corrections (#31): - Add 010_corrections.sql migration (from cf-core CORRECTIONS_MIGRATION_SQL) - Wire make_corrections_router() at /api/corrections (shared_db, product='snipe') - get_shared_db() dependency aggregates corrections across all cloud users Community module (#32 #33): - Init SnipeCommunityStore at startup when COMMUNITY_DB_URL is set - Graceful skip if COMMUNITY_DB_URL is unset (local mode, community disabled) - add_to_blocklist() publishes confirmed_scam=True seller_trust signal to community postgres on every manual blocklist addition (fire-and-forget) - BlocklistAdd gains flags[] field so active red-flag keys travel with signal cf-orch community postgres (cf-orch#36) + cf-core module (cf-core#47) both merged. --- api/main.py | 76 +++++++++++++++++++++++++++ app/db/migrations/010_corrections.sql | 23 ++++++++ 2 files changed, 99 insertions(+) create mode 100644 app/db/migrations/010_corrections.sql diff --git a/api/main.py b/api/main.py index 819f18e..43064d2 100644 --- a/api/main.py +++ b/api/main.py @@ -17,6 +17,7 @@ from pathlib import Path from typing import Optional from circuitforge_core.affiliates import wrap_url as _wrap_affiliate_url +from circuitforge_core.api import make_corrections_router as _make_corrections_router from circuitforge_core.api import make_feedback_router as _make_feedback_router from circuitforge_core.config import load_env from fastapi import Depends, FastAPI, File, HTTPException, Request, Response, UploadFile @@ -46,8 +47,19 @@ log = logging.getLogger(__name__) _update_queues: dict[str, _queue.SimpleQueue] = {} +# ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ──────── +# Holds SnipeCommunityStore at module level so endpoints can publish signals +# without constructing a new connection pool on every request. +_community_store: "SnipeCommunityStore | None" = None + + +def _get_community_store() -> "SnipeCommunityStore | None": + return _community_store + + @asynccontextmanager async def _lifespan(app: FastAPI): + global _community_store # Start vision/LLM background task scheduler. # background_tasks queue lives in shared_db (cloud) or local_db (local) # so the scheduler has a single stable DB path across all cloud users. @@ -56,10 +68,33 @@ async def _lifespan(app: FastAPI): sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB get_scheduler(sched_db) log.info("Snipe task scheduler started (db=%s)", sched_db) + + # Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset. + community_db_url = os.environ.get("COMMUNITY_DB_URL", "") + if community_db_url: + try: + from circuitforge_core.community import CommunityDB + from circuitforge_core.community.snipe_store import SnipeCommunityStore + _cdb = CommunityDB(community_db_url) + _cdb.run_migrations() + _community_store = SnipeCommunityStore(_cdb, source_product="snipe") + log.info("Community DB connected — seller trust signals enabled.") + except Exception: + log.warning("Community DB unavailable — seller trust signals disabled.", exc_info=True) + else: + log.debug("COMMUNITY_DB_URL not set — community trust signals disabled.") + yield + get_scheduler(sched_db).shutdown(timeout=10.0) reset_scheduler() log.info("Snipe task scheduler stopped.") + if _community_store is not None: + try: + _community_store._db.close() + except Exception: + pass + _community_store = None def _ebay_creds() -> tuple[str, str, str]: @@ -95,6 +130,28 @@ _feedback_router = _make_feedback_router( app.include_router(_feedback_router, prefix="/api/feedback") +def _get_shared_db(): + """FastAPI dependency — yields a sqlite3.Connection to the shared DB. + + Corrections (LLM feedback) are stored in shared_db so they aggregate + across all cloud users rather than being siloed per-user. + Used by make_corrections_router. + """ + import sqlite3 + from api.cloud_session import CLOUD_MODE, _LOCAL_SNIPE_DB, _shared_db_path + db_path = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB + conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.row_factory = sqlite3.Row + try: + yield conn + finally: + conn.close() + + +_corrections_router = _make_corrections_router(get_db=_get_shared_db, product="snipe") +app.include_router(_corrections_router, prefix="/api/corrections") + + @app.get("/api/health") def health(): return {"status": "ok"} @@ -753,6 +810,7 @@ class BlocklistAdd(BaseModel): platform_seller_id: str username: str reason: str = "" + flags: list[str] = [] # red-flag keys active at block time — forwarded to community signal @app.get("/api/blocklist") @@ -776,6 +834,24 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio reason=body.reason or None, source="manual", )) + + # Publish seller trust signal to community DB (fire-and-forget; never fails the request). + cs = _get_community_store() + if cs is not None: + try: + cs.publish_seller_signal( + platform_seller_id=body.platform_seller_id, + confirmed_scam=True, + signal_source="blocklist_add", + flags=body.flags or [], + platform=body.platform, + ) + except Exception: + log.warning( + "Failed to publish seller signal for %s — continuing.", body.platform_seller_id, + exc_info=True, + ) + return dataclasses.asdict(entry) diff --git a/app/db/migrations/010_corrections.sql b/app/db/migrations/010_corrections.sql new file mode 100644 index 0000000..4aa057d --- /dev/null +++ b/app/db/migrations/010_corrections.sql @@ -0,0 +1,23 @@ +-- LLM output corrections for SFT training pipeline (cf-core make_corrections_router). +-- Stores thumbs-up/down feedback and explicit corrections on LLM-generated content. +-- Used once #29 (LLM query builder) ships; table is safe to pre-create now. + +CREATE TABLE IF NOT EXISTS corrections ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + item_id TEXT NOT NULL DEFAULT '', + product TEXT NOT NULL, + correction_type TEXT NOT NULL, + input_text TEXT NOT NULL, + original_output TEXT NOT NULL, + corrected_output TEXT NOT NULL DEFAULT '', + rating TEXT NOT NULL DEFAULT 'down', + context TEXT NOT NULL DEFAULT '{}', + opted_in INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX IF NOT EXISTS idx_corrections_product + ON corrections (product); + +CREATE INDEX IF NOT EXISTS idx_corrections_opted_in + ON corrections (opted_in);