Delegates JWT validation, Heimdall provision/tier-resolve, bypass-IP handling, and guest session management to circuitforge_core. Snipe keeps its own CloudUser (shared_db + user_db), SessionFeatures, compute_features, and DB path helpers. Removes ~158 lines of duplicated auth code. Note: get_session() now takes (Request, Response) — FastAPI auto-injects both, no call-site changes needed.
150 lines
5.7 KiB
Python
150 lines
5.7 KiB
Python
"""Cloud session resolution for Snipe FastAPI.
|
|
|
|
Delegates JWT validation, Heimdall provisioning, tier resolution, and guest
|
|
session management to circuitforge_core.CloudSessionFactory. Snipe-specific
|
|
CloudUser (shared_db + user_db paths), SessionFeatures, and DB helpers are
|
|
kept here.
|
|
|
|
FastAPI usage:
|
|
@app.get("/api/search")
|
|
def search(session: CloudUser = Depends(get_session)):
|
|
shared_store = Store(session.shared_db)
|
|
user_store = Store(session.user_db)
|
|
...
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from circuitforge_core.cloud_session import CloudSessionFactory as _CoreFactory
|
|
from fastapi import Depends, HTTPException, Request, Response
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
|
|
CLOUD_DATA_ROOT: Path = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/snipe-cloud-data"))
|
|
|
|
_LOCAL_SNIPE_DB: Path = Path(os.environ.get("SNIPE_DB", "data/snipe.db"))
|
|
|
|
TIERS = ["free", "paid", "premium", "ultra"]
|
|
|
|
_core = _CoreFactory(product="snipe")
|
|
|
|
|
|
# ── Domain ────────────────────────────────────────────────────────────────────
|
|
|
|
@dataclass(frozen=True)
|
|
class CloudUser:
|
|
user_id: str # Directus UUID, or "local" in local mode
|
|
tier: str # free | paid | premium | ultra | local
|
|
shared_db: Path # sellers, market_comps — shared across all users
|
|
user_db: Path # listings, saved_searches, trust_scores — per-user
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SessionFeatures:
|
|
saved_searches: bool
|
|
saved_searches_limit: Optional[int] # None = unlimited
|
|
background_monitoring: bool
|
|
max_pages: int
|
|
upc_search: bool
|
|
photo_analysis: bool
|
|
shared_scammer_db: bool
|
|
shared_image_db: bool
|
|
llm_query_builder: bool
|
|
|
|
|
|
def compute_features(tier: str) -> SessionFeatures:
|
|
"""Compute feature flags from tier. Evaluated server-side; sent to frontend."""
|
|
local = tier == "local"
|
|
paid_plus = local or tier in ("paid", "premium", "ultra")
|
|
|
|
return SessionFeatures(
|
|
saved_searches=True, # all tiers get saved searches
|
|
saved_searches_limit=None if paid_plus else 3,
|
|
background_monitoring=paid_plus,
|
|
max_pages=999 if local else (5 if paid_plus else 1),
|
|
upc_search=paid_plus,
|
|
photo_analysis=paid_plus,
|
|
shared_scammer_db=paid_plus,
|
|
shared_image_db=paid_plus,
|
|
llm_query_builder=paid_plus,
|
|
)
|
|
|
|
|
|
# ── DB path helpers ───────────────────────────────────────────────────────────
|
|
|
|
def _shared_db_path() -> Path:
|
|
path = CLOUD_DATA_ROOT / "shared" / "shared.db"
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def _user_db_path(user_id: str) -> Path:
|
|
path = CLOUD_DATA_ROOT / user_id / "snipe" / "user.db"
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def _anon_db_path() -> Path:
|
|
"""Shared pool DB for unauthenticated visitors.
|
|
|
|
All anonymous searches write listing data here. Seller and market comp
|
|
data accumulates in shared_db as normal, growing the anti-scammer corpus
|
|
with every public search regardless of auth state.
|
|
"""
|
|
path = CLOUD_DATA_ROOT / "anonymous" / "snipe" / "user.db"
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
# ── FastAPI dependency ────────────────────────────────────────────────────────
|
|
|
|
def get_session(request: Request, response: Response) -> CloudUser:
|
|
"""FastAPI dependency — resolves the current user from the request.
|
|
|
|
Delegates auth/tier resolution to cf-core CloudSessionFactory, then maps
|
|
the result to Snipe's CloudUser with shared_db + user_db paths.
|
|
|
|
Local mode: fully-privileged "local" user pointing at SNIPE_DB.
|
|
Cloud mode: validates X-CF-Session JWT, provisions Heimdall license,
|
|
resolves tier, returns per-user DB paths.
|
|
Anonymous: guest session with free-tier access to shared scammer corpus.
|
|
"""
|
|
core_user = _core.resolve(request, response)
|
|
uid, tier = core_user.user_id, core_user.tier
|
|
|
|
if not CLOUD_MODE or uid in ("local", "local-dev"):
|
|
return CloudUser(user_id=uid, tier=tier, shared_db=_LOCAL_SNIPE_DB, user_db=_LOCAL_SNIPE_DB)
|
|
if uid.startswith("anon-"):
|
|
return CloudUser(user_id=uid, tier=tier, shared_db=_shared_db_path(), user_db=_anon_db_path())
|
|
return CloudUser(user_id=uid, tier=tier, shared_db=_shared_db_path(), user_db=_user_db_path(uid))
|
|
|
|
|
|
def require_tier(min_tier: str):
|
|
"""Dependency factory — raises 403 if the session tier is below min_tier.
|
|
|
|
Usage: @app.post("/api/foo", dependencies=[Depends(require_tier("paid"))])
|
|
"""
|
|
min_idx = TIERS.index(min_tier)
|
|
|
|
def _check(session: CloudUser = Depends(get_session)) -> CloudUser:
|
|
if session.tier == "local":
|
|
return session # local users always pass
|
|
try:
|
|
if TIERS.index(session.tier) < min_idx:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"This feature requires {min_tier} tier or above.",
|
|
)
|
|
except ValueError:
|
|
raise HTTPException(status_code=403, detail="Unknown tier.")
|
|
return session
|
|
|
|
return _check
|