kiwi/app/cloud_session.py

249 lines
9.4 KiB
Python

"""Cloud session resolution for Kiwi FastAPI.
Local mode (CLOUD_MODE unset/false): returns a local CloudUser with no auth
checks, full tier access, and DB path pointing to settings.DB_PATH.
Cloud mode (CLOUD_MODE=true): validates the cf_session JWT injected by Caddy
as X-CF-Session, resolves user_id, auto-provisions a free Heimdall license on
first visit, fetches the tier, and returns a per-user DB path.
FastAPI usage:
@app.get("/api/v1/inventory/items")
def list_items(session: CloudUser = Depends(get_session)):
store = Store(session.db)
...
"""
from __future__ import annotations
import logging
import os
import re
import time
from dataclasses import dataclass
from pathlib import Path
import jwt as pyjwt
import requests
import yaml
from fastapi import Depends, HTTPException, Request
log = logging.getLogger(__name__)
# ── Config ────────────────────────────────────────────────────────────────────
CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
CLOUD_DATA_ROOT: Path = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/kiwi-cloud-data"))
DIRECTUS_JWT_SECRET: str = os.environ.get("DIRECTUS_JWT_SECRET", "")
HEIMDALL_URL: str = os.environ.get("HEIMDALL_URL", "https://license.circuitforge.tech")
HEIMDALL_ADMIN_TOKEN: str = os.environ.get("HEIMDALL_ADMIN_TOKEN", "")
# Dev bypass: comma-separated IPs or CIDR ranges that skip JWT auth.
# NEVER set this in production. Intended only for LAN developer testing when
# the request doesn't pass through Caddy (which normally injects X-CF-Session).
# Example: CLOUD_AUTH_BYPASS_IPS=10.1.10.0/24,127.0.0.1
import ipaddress as _ipaddress
_BYPASS_RAW: list[str] = [
e.strip()
for e in os.environ.get("CLOUD_AUTH_BYPASS_IPS", "").split(",")
if e.strip()
]
_BYPASS_NETS: list[_ipaddress.IPv4Network | _ipaddress.IPv6Network] = []
_BYPASS_IPS: frozenset[str] = frozenset()
if _BYPASS_RAW:
_nets, _ips = [], set()
for entry in _BYPASS_RAW:
try:
_nets.append(_ipaddress.ip_network(entry, strict=False))
except ValueError:
_ips.add(entry) # treat non-parseable entries as bare IPs
_BYPASS_NETS = _nets
_BYPASS_IPS = frozenset(_ips)
def _is_bypass_ip(ip: str) -> bool:
if not ip:
return False
if ip in _BYPASS_IPS:
return True
try:
addr = _ipaddress.ip_address(ip)
return any(addr in net for net in _BYPASS_NETS)
except ValueError:
return False
_LOCAL_KIWI_DB: Path = Path(os.environ.get("KIWI_DB", "data/kiwi.db"))
_TIER_CACHE: dict[str, tuple[str, float]] = {}
_TIER_CACHE_TTL = 300 # 5 minutes
TIERS = ["free", "paid", "premium", "ultra"]
# ── Domain ────────────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class CloudUser:
user_id: str # Directus UUID, or "local"
tier: str # free | paid | premium | ultra | local
db: Path # per-user SQLite DB path
has_byok: bool # True if a configured LLM backend is present in llm.yaml
# ── JWT validation ─────────────────────────────────────────────────────────────
def _extract_session_token(header_value: str) -> str:
m = re.search(r'(?:^|;)\s*cf_session=([^;]+)', header_value)
return m.group(1).strip() if m else header_value.strip()
def validate_session_jwt(token: str) -> str:
"""Validate cf_session JWT and return the Directus user_id."""
try:
payload = pyjwt.decode(
token,
DIRECTUS_JWT_SECRET,
algorithms=["HS256"],
options={"require": ["id", "exp"]},
)
return payload["id"]
except Exception as exc:
log.debug("JWT validation failed: %s", exc)
raise HTTPException(status_code=401, detail="Session invalid or expired")
# ── Heimdall integration ──────────────────────────────────────────────────────
def _ensure_provisioned(user_id: str) -> None:
if not HEIMDALL_ADMIN_TOKEN:
return
try:
requests.post(
f"{HEIMDALL_URL}/admin/provision",
json={"directus_user_id": user_id, "product": "kiwi", "tier": "free"},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5,
)
except Exception as exc:
log.warning("Heimdall provision failed for user %s: %s", user_id, exc)
def _fetch_cloud_tier(user_id: str) -> str:
now = time.monotonic()
cached = _TIER_CACHE.get(user_id)
if cached and (now - cached[1]) < _TIER_CACHE_TTL:
return cached[0]
if not HEIMDALL_ADMIN_TOKEN:
return "free"
try:
resp = requests.post(
f"{HEIMDALL_URL}/admin/cloud/resolve",
json={"directus_user_id": user_id, "product": "kiwi"},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5,
)
tier = resp.json().get("tier", "free") if resp.ok else "free"
except Exception as exc:
log.warning("Heimdall tier resolve failed for user %s: %s", user_id, exc)
tier = "free"
_TIER_CACHE[user_id] = (tier, now)
return tier
def _user_db_path(user_id: str) -> Path:
path = CLOUD_DATA_ROOT / user_id / "kiwi.db"
path.parent.mkdir(parents=True, exist_ok=True)
return path
# ── BYOK detection ────────────────────────────────────────────────────────────
_LLM_CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml"
def _detect_byok(config_path: Path = _LLM_CONFIG_PATH) -> bool:
"""Return True if at least one enabled non-vision LLM backend is configured.
Reads the same llm.yaml that LLMRouter uses. Local (Ollama, vLLM) and
API-key backends both count — the policy is "user is supplying compute",
regardless of where that compute lives.
"""
try:
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
return any(
b.get("enabled", True) and b.get("type") != "vision_service"
for b in cfg.get("backends", {}).values()
)
except Exception:
return False
# ── FastAPI dependency ────────────────────────────────────────────────────────
def get_session(request: Request) -> CloudUser:
"""FastAPI dependency — resolves the current user from the request.
Local mode: fully-privileged "local" user pointing at local DB.
Cloud mode: validates X-CF-Session JWT, provisions license, resolves tier.
Dev bypass: if CLOUD_AUTH_BYPASS_IPS is set and the client IP matches,
returns a "local" session without JWT validation (dev/LAN use only).
"""
has_byok = _detect_byok()
if not CLOUD_MODE:
return CloudUser(user_id="local", tier="local", db=_LOCAL_KIWI_DB, has_byok=has_byok)
# Prefer X-Real-IP (set by nginx from the actual client address) over the
# TCP peer address (which is nginx's container IP when behind the proxy).
# Prefer X-Real-IP (set by nginx from the actual client address) over the
# TCP peer address (which is nginx's container IP when behind the proxy).
client_ip = (
request.headers.get("x-real-ip", "")
or (request.client.host if request.client else "")
)
if (_BYPASS_IPS or _BYPASS_NETS) and _is_bypass_ip(client_ip):
log.debug("CLOUD_AUTH_BYPASS_IPS match for %s — returning local session", client_ip)
# Use a dev DB under CLOUD_DATA_ROOT so the container has a writable path.
dev_db = _user_db_path("local-dev")
return CloudUser(user_id="local-dev", tier="local", db=dev_db, has_byok=has_byok)
raw_header = (
request.headers.get("x-cf-session", "")
or request.headers.get("cookie", "")
)
if not raw_header:
raise HTTPException(status_code=401, detail="Not authenticated")
token = _extract_session_token(raw_header) # gitleaks:allow — function name, not a secret
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = validate_session_jwt(token)
_ensure_provisioned(user_id)
tier = _fetch_cloud_tier(user_id)
return CloudUser(user_id=user_id, tier=tier, db=_user_db_path(user_id), has_byok=has_byok)
def require_tier(min_tier: str):
"""Dependency factory — raises 403 if tier is below min_tier."""
min_idx = TIERS.index(min_tier)
def _check(session: CloudUser = Depends(get_session)) -> CloudUser:
if session.tier == "local":
return session
try:
if TIERS.index(session.tier) < min_idx:
raise HTTPException(
status_code=403,
detail=f"This feature requires {min_tier} tier or above.",
)
except ValueError:
raise HTTPException(status_code=403, detail="Unknown tier.")
return session
return _check