feat: wire community module + corrections router (#31 #32 #33)

Corrections (#31):
- Add 010_corrections.sql migration (from cf-core CORRECTIONS_MIGRATION_SQL)
- Wire make_corrections_router() at /api/corrections (shared_db, product='snipe')
- get_shared_db() dependency aggregates corrections across all cloud users

Community module (#32 #33):
- Init SnipeCommunityStore at startup when COMMUNITY_DB_URL is set
- Graceful skip if COMMUNITY_DB_URL is unset (local mode, community disabled)
- add_to_blocklist() publishes confirmed_scam=True seller_trust signal to
  community postgres on every manual blocklist addition (fire-and-forget)
- BlocklistAdd gains flags[] field so active red-flag keys travel with signal

cf-orch community postgres (cf-orch#36) + cf-core module (cf-core#47) both merged.
This commit is contained in:
pyr0ball 2026-04-14 08:33:00 -07:00
parent 72b86834d8
commit 3a4b33d5dd
2 changed files with 99 additions and 0 deletions

View file

@ -17,6 +17,7 @@ from pathlib import Path
from typing import Optional
from circuitforge_core.affiliates import wrap_url as _wrap_affiliate_url
from circuitforge_core.api import make_corrections_router as _make_corrections_router
from circuitforge_core.api import make_feedback_router as _make_feedback_router
from circuitforge_core.config import load_env
from fastapi import Depends, FastAPI, File, HTTPException, Request, Response, UploadFile
@ -46,8 +47,19 @@ log = logging.getLogger(__name__)
_update_queues: dict[str, _queue.SimpleQueue] = {}
# ── Community DB (optional — only active when COMMUNITY_DB_URL is set) ────────
# Holds SnipeCommunityStore at module level so endpoints can publish signals
# without constructing a new connection pool on every request.
_community_store: "SnipeCommunityStore | None" = None
def _get_community_store() -> "SnipeCommunityStore | None":
return _community_store
@asynccontextmanager
async def _lifespan(app: FastAPI):
global _community_store
# Start vision/LLM background task scheduler.
# background_tasks queue lives in shared_db (cloud) or local_db (local)
# so the scheduler has a single stable DB path across all cloud users.
@ -56,10 +68,33 @@ async def _lifespan(app: FastAPI):
sched_db = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB
get_scheduler(sched_db)
log.info("Snipe task scheduler started (db=%s)", sched_db)
# Community DB — optional. Skipped gracefully if COMMUNITY_DB_URL is unset.
community_db_url = os.environ.get("COMMUNITY_DB_URL", "")
if community_db_url:
try:
from circuitforge_core.community import CommunityDB
from circuitforge_core.community.snipe_store import SnipeCommunityStore
_cdb = CommunityDB(community_db_url)
_cdb.run_migrations()
_community_store = SnipeCommunityStore(_cdb, source_product="snipe")
log.info("Community DB connected — seller trust signals enabled.")
except Exception:
log.warning("Community DB unavailable — seller trust signals disabled.", exc_info=True)
else:
log.debug("COMMUNITY_DB_URL not set — community trust signals disabled.")
yield
get_scheduler(sched_db).shutdown(timeout=10.0)
reset_scheduler()
log.info("Snipe task scheduler stopped.")
if _community_store is not None:
try:
_community_store._db.close()
except Exception:
pass
_community_store = None
def _ebay_creds() -> tuple[str, str, str]:
@ -95,6 +130,28 @@ _feedback_router = _make_feedback_router(
app.include_router(_feedback_router, prefix="/api/feedback")
def _get_shared_db():
"""FastAPI dependency — yields a sqlite3.Connection to the shared DB.
Corrections (LLM feedback) are stored in shared_db so they aggregate
across all cloud users rather than being siloed per-user.
Used by make_corrections_router.
"""
import sqlite3
from api.cloud_session import CLOUD_MODE, _LOCAL_SNIPE_DB, _shared_db_path
db_path = _shared_db_path() if CLOUD_MODE else _LOCAL_SNIPE_DB
conn = sqlite3.connect(str(db_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
_corrections_router = _make_corrections_router(get_db=_get_shared_db, product="snipe")
app.include_router(_corrections_router, prefix="/api/corrections")
@app.get("/api/health")
def health():
return {"status": "ok"}
@ -753,6 +810,7 @@ class BlocklistAdd(BaseModel):
platform_seller_id: str
username: str
reason: str = ""
flags: list[str] = [] # red-flag keys active at block time — forwarded to community signal
@app.get("/api/blocklist")
@ -776,6 +834,24 @@ def add_to_blocklist(body: BlocklistAdd, session: CloudUser = Depends(get_sessio
reason=body.reason or None,
source="manual",
))
# Publish seller trust signal to community DB (fire-and-forget; never fails the request).
cs = _get_community_store()
if cs is not None:
try:
cs.publish_seller_signal(
platform_seller_id=body.platform_seller_id,
confirmed_scam=True,
signal_source="blocklist_add",
flags=body.flags or [],
platform=body.platform,
)
except Exception:
log.warning(
"Failed to publish seller signal for %s — continuing.", body.platform_seller_id,
exc_info=True,
)
return dataclasses.asdict(entry)

View file

@ -0,0 +1,23 @@
-- LLM output corrections for SFT training pipeline (cf-core make_corrections_router).
-- Stores thumbs-up/down feedback and explicit corrections on LLM-generated content.
-- Used once #29 (LLM query builder) ships; table is safe to pre-create now.
CREATE TABLE IF NOT EXISTS corrections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id TEXT NOT NULL DEFAULT '',
product TEXT NOT NULL,
correction_type TEXT NOT NULL,
input_text TEXT NOT NULL,
original_output TEXT NOT NULL,
corrected_output TEXT NOT NULL DEFAULT '',
rating TEXT NOT NULL DEFAULT 'down',
context TEXT NOT NULL DEFAULT '{}',
opted_in INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_corrections_product
ON corrections (product);
CREATE INDEX IF NOT EXISTS idx_corrections_opted_in
ON corrections (opted_in);