refactor: replace hand-rolled JWT+Heimdall with cf-core CloudSessionFactory
Some checks are pending
CI / Backend (Python) (push) Waiting to run
CI / Frontend (Vue) (push) Waiting to run
Mirror / mirror (push) Waiting to run

Delegates JWT validation, Heimdall provision/tier-resolve, bypass-IP
handling, and guest session management to circuitforge_core. Kiwi keeps
its own CloudUser (db path, household fields, BYOK flag) and DB helpers.
detect_byok() is now imported from cf-core instead of a local copy.
household_id/is_household_owner/license_key flow through core_user.meta
(cf-core already forwards all Heimdall response extras into meta).
Removes ~217 lines of duplicated auth code.

Note: guest cookie name changes from kiwi_guest_id to cf_guest_id (cf-core
managed). Existing guest sessions get a new UUID on first visit — acceptable
for alpha.
This commit is contained in:
pyr0ball 2026-04-25 16:35:56 -07:00
parent b86b7732dc
commit f6b29693c8

View file

@ -1,11 +1,9 @@
"""Cloud session resolution for Kiwi FastAPI. """Cloud session resolution for Kiwi FastAPI.
Local mode (CLOUD_MODE unset/false): returns a local CloudUser with no auth Delegates JWT validation, Heimdall provisioning, tier resolution, and guest
checks, full tier access, and DB path pointing to settings.DB_PATH. session management to circuitforge_core.CloudSessionFactory. Kiwi-specific
CloudUser (per-user DB path, household data, BYOK flag) and DB helpers are
Cloud mode (CLOUD_MODE=true): validates the cf_session JWT injected by Caddy kept here.
as X-CF-Session, resolves user_id, auto-provisions a free Heimdall license on
first visit, fetches the tier, and returns a per-user DB path.
FastAPI usage: FastAPI usage:
@app.get("/api/v1/inventory/items") @app.get("/api/v1/inventory/items")
@ -17,16 +15,10 @@ from __future__ import annotations
import logging import logging
import os import os
import re
import time
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import uuid from circuitforge_core.cloud_session import CloudSessionFactory as _CoreFactory, detect_byok
import jwt as pyjwt
import requests
import yaml
from fastapi import Depends, HTTPException, Request, Response from fastapi import Depends, HTTPException, Request, Response
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -35,54 +27,13 @@ log = logging.getLogger(__name__)
CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes") CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
CLOUD_DATA_ROOT: Path = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/kiwi-cloud-data")) CLOUD_DATA_ROOT: Path = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/kiwi-cloud-data"))
DIRECTUS_JWT_SECRET: str = os.environ.get("DIRECTUS_JWT_SECRET", "")
HEIMDALL_URL: str = os.environ.get("HEIMDALL_URL", "https://license.circuitforge.tech")
HEIMDALL_ADMIN_TOKEN: str = os.environ.get("HEIMDALL_ADMIN_TOKEN", "")
# Dev bypass: comma-separated IPs or CIDR ranges that skip JWT auth.
# NEVER set this in production. Intended only for LAN developer testing when
# the request doesn't pass through Caddy (which normally injects X-CF-Session).
# Example: CLOUD_AUTH_BYPASS_IPS=10.1.10.0/24,127.0.0.1
import ipaddress as _ipaddress
_BYPASS_RAW: list[str] = [
e.strip()
for e in os.environ.get("CLOUD_AUTH_BYPASS_IPS", "").split(",")
if e.strip()
]
_BYPASS_NETS: list[_ipaddress.IPv4Network | _ipaddress.IPv6Network] = []
_BYPASS_IPS: frozenset[str] = frozenset()
if _BYPASS_RAW:
_nets, _ips = [], set()
for entry in _BYPASS_RAW:
try:
_nets.append(_ipaddress.ip_network(entry, strict=False))
except ValueError:
_ips.add(entry) # treat non-parseable entries as bare IPs
_BYPASS_NETS = _nets
_BYPASS_IPS = frozenset(_ips)
def _is_bypass_ip(ip: str) -> bool:
if not ip:
return False
if ip in _BYPASS_IPS:
return True
try:
addr = _ipaddress.ip_address(ip)
return any(addr in net for net in _BYPASS_NETS)
except ValueError:
return False
_LOCAL_KIWI_DB: Path = Path(os.environ.get("KIWI_DB", "data/kiwi.db")) _LOCAL_KIWI_DB: Path = Path(os.environ.get("KIWI_DB", "data/kiwi.db"))
_TIER_CACHE: dict[str, tuple[dict, float]] = {}
_TIER_CACHE_TTL = 300 # 5 minutes
TIERS = ["free", "paid", "premium", "ultra"] TIERS = ["free", "paid", "premium", "ultra"]
_core = _CoreFactory(product="kiwi", byok_detector=detect_byok)
def _auth_label(user_id: str) -> str: def _auth_label(user_id: str) -> str:
"""Classify a user_id into a short tag for structured log lines. No PII emitted.""" """Classify a user_id into a short tag for structured log lines. No PII emitted."""
@ -106,73 +57,7 @@ class CloudUser:
license_key: str | None = None # key_display for lifetime/founders keys; None for subscription/free license_key: str | None = None # key_display for lifetime/founders keys; None for subscription/free
# ── JWT validation ───────────────────────────────────────────────────────────── # ── DB path helpers ───────────────────────────────────────────────────────────
def _extract_session_token(header_value: str) -> str:
m = re.search(r'(?:^|;)\s*cf_session=([^;]+)', header_value)
return m.group(1).strip() if m else header_value.strip()
def validate_session_jwt(token: str) -> str:
"""Validate cf_session JWT and return the Directus user_id."""
try:
payload = pyjwt.decode(
token,
DIRECTUS_JWT_SECRET,
algorithms=["HS256"],
options={"require": ["id", "exp"]},
)
return payload["id"]
except Exception as exc:
log.debug("JWT validation failed: %s", exc)
raise HTTPException(status_code=401, detail="Session invalid or expired")
# ── Heimdall integration ──────────────────────────────────────────────────────
def _ensure_provisioned(user_id: str) -> None:
if not HEIMDALL_ADMIN_TOKEN:
return
try:
requests.post(
f"{HEIMDALL_URL}/admin/provision",
json={"directus_user_id": user_id, "product": "kiwi", "tier": "free"},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5,
)
except Exception as exc:
log.warning("Heimdall provision failed for user %s: %s", user_id, exc)
def _fetch_cloud_tier(user_id: str) -> tuple[str, str | None, bool, str | None]:
"""Returns (tier, household_id | None, is_household_owner, license_key | None)."""
now = time.monotonic()
cached = _TIER_CACHE.get(user_id)
if cached and (now - cached[1]) < _TIER_CACHE_TTL:
entry = cached[0]
return entry["tier"], entry.get("household_id"), entry.get("is_household_owner", False), entry.get("license_key")
if not HEIMDALL_ADMIN_TOKEN:
return "free", None, False, None
try:
resp = requests.post(
f"{HEIMDALL_URL}/admin/cloud/resolve",
json={"directus_user_id": user_id, "product": "kiwi"},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5,
)
data = resp.json() if resp.ok else {}
tier = data.get("tier", "free")
household_id = data.get("household_id")
is_owner = data.get("is_household_owner", False)
license_key = data.get("key_display")
except Exception as exc:
log.warning("Heimdall tier resolve failed for user %s: %s", user_id, exc)
tier, household_id, is_owner, license_key = "free", None, False, None
_TIER_CACHE[user_id] = ({"tier": tier, "household_id": household_id, "is_household_owner": is_owner, "license_key": license_key}, now)
return tier, household_id, is_owner, license_key
def _user_db_path(user_id: str, household_id: str | None = None) -> Path: def _user_db_path(user_id: str, household_id: str | None = None) -> Path:
if household_id: if household_id:
@ -194,112 +79,45 @@ def _anon_guest_db_path(guest_id: str) -> Path:
return path return path
# ── BYOK detection ────────────────────────────────────────────────────────────
_LLM_CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml"
def _detect_byok(config_path: Path = _LLM_CONFIG_PATH) -> bool:
"""Return True if at least one enabled non-vision LLM backend is configured.
Reads the same llm.yaml that LLMRouter uses. Local (Ollama, vLLM) and
API-key backends both count the policy is "user is supplying compute",
regardless of where that compute lives.
"""
try:
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
return any(
b.get("enabled", True) and b.get("type") != "vision_service"
for b in cfg.get("backends", {}).values()
)
except Exception:
return False
# ── FastAPI dependency ──────────────────────────────────────────────────────── # ── FastAPI dependency ────────────────────────────────────────────────────────
_GUEST_COOKIE = "kiwi_guest_id"
_GUEST_COOKIE_MAX_AGE = 60 * 60 * 24 * 90 # 90 days
def _resolve_guest_session(request: Request, response: Response, has_byok: bool) -> CloudUser:
"""Return a per-session anonymous CloudUser, creating a guest UUID cookie if needed."""
guest_id = request.cookies.get(_GUEST_COOKIE, "").strip()
is_new = not guest_id
if is_new:
guest_id = str(uuid.uuid4())
log.debug("New guest session assigned: anon-%s", guest_id[:8])
# Secure flag only when the request actually arrived over HTTPS
# (Caddy sets X-Forwarded-Proto=https in cloud; absent on direct port access).
# Avoids losing the session cookie on HTTP direct-port testing of the cloud stack.
is_https = request.headers.get("x-forwarded-proto", "http").lower() == "https"
response.set_cookie(
key=_GUEST_COOKIE,
value=guest_id,
max_age=_GUEST_COOKIE_MAX_AGE,
httponly=True,
samesite="lax",
secure=is_https,
)
return CloudUser(
user_id=f"anon-{guest_id}",
tier="free",
db=_anon_guest_db_path(guest_id),
has_byok=has_byok,
)
def get_session(request: Request, response: Response) -> CloudUser: def get_session(request: Request, response: Response) -> CloudUser:
"""FastAPI dependency — resolves the current user from the request. """FastAPI dependency — resolves the current user from the request.
Delegates auth/tier resolution to cf-core CloudSessionFactory, then maps
the result to Kiwi's CloudUser with per-user DB path and household data.
Local mode: fully-privileged "local" user pointing at local DB. Local mode: fully-privileged "local" user pointing at local DB.
Cloud mode: validates X-CF-Session JWT, provisions license, resolves tier. Cloud mode: validates X-CF-Session JWT, provisions license, resolves tier.
Dev bypass: if CLOUD_AUTH_BYPASS_IPS is set and the client IP matches, Dev bypass: CLOUD_AUTH_BYPASS_IPS match returns a "local-dev" session.
returns a "local" session without JWT validation (dev/LAN use only). Anonymous: per-session UUID cookie (cf_guest_id) isolates each guest's data.
Anonymous: per-session UUID cookie isolates each guest visitor's data.
""" """
has_byok = _detect_byok() core_user = _core.resolve(request, response)
uid, tier, has_byok = core_user.user_id, core_user.tier, core_user.has_byok
if not CLOUD_MODE: if not CLOUD_MODE or uid in ("local", "local-dev"):
return CloudUser(user_id="local", tier="local", db=_LOCAL_KIWI_DB, has_byok=has_byok) # local-dev gets a writable path under CLOUD_DATA_ROOT; local uses KIWI_DB
db = _user_db_path(uid) if uid == "local-dev" else _LOCAL_KIWI_DB
return CloudUser(user_id=uid, tier=tier, db=db, has_byok=has_byok)
# Prefer X-Real-IP (set by Caddy from the actual client address) over the if uid.startswith("anon-"):
# TCP peer address (which is nginx's container IP when behind the proxy). guest_id = uid[len("anon-"):]
client_ip = ( return CloudUser(
request.headers.get("x-real-ip", "") user_id=uid, tier=tier,
or (request.client.host if request.client else "") db=_anon_guest_db_path(guest_id),
) has_byok=has_byok,
if (_BYPASS_IPS or _BYPASS_NETS) and _is_bypass_ip(client_ip): )
log.debug("CLOUD_AUTH_BYPASS_IPS match for %s — returning local session", client_ip)
# Use a dev DB under CLOUD_DATA_ROOT so the container has a writable path.
dev_db = _user_db_path("local-dev")
return CloudUser(user_id="local-dev", tier="local", db=dev_db, has_byok=has_byok)
# Resolve cf_session JWT: prefer the explicit header injected by Caddy, then household_id = core_user.meta.get("household_id")
# fall back to the cf_session cookie value. Other cookies (e.g. kiwi_guest_id) is_owner = core_user.meta.get("is_household_owner", False)
# must never be treated as auth tokens. license_key = core_user.meta.get("license_key")
raw_session = request.headers.get("x-cf-session", "").strip() log.debug("Resolved %s session uid=%s tier=%s household=%s", _auth_label(uid), uid[:8], tier, household_id)
if not raw_session:
raw_session = request.cookies.get("cf_session", "").strip()
if not raw_session:
return _resolve_guest_session(request, response, has_byok)
token = _extract_session_token(raw_session) # gitleaks:allow — function name, not a secret
if not token:
return _resolve_guest_session(request, response, has_byok)
user_id = validate_session_jwt(token)
_ensure_provisioned(user_id)
tier, household_id, is_household_owner, license_key = _fetch_cloud_tier(user_id)
return CloudUser( return CloudUser(
user_id=user_id, user_id=uid, tier=tier,
tier=tier, db=_user_db_path(uid, household_id=household_id),
db=_user_db_path(user_id, household_id=household_id),
has_byok=has_byok, has_byok=has_byok,
household_id=household_id, household_id=household_id,
is_household_owner=is_household_owner, is_household_owner=is_owner,
license_key=license_key, license_key=license_key,
) )