peregrine/app/cloud_session.py
pyr0ball 9603d591a3 fix(cloud): use per-user config dir for wizard gate; redirect on invalid session
- app.py: wizard gate now reads get_config_dir()/user.yaml instead of
  hardcoded repo-level config/ — fixes perpetual onboarding loop in
  cloud mode where per-user wizard_complete was never seen
- app.py: page title corrected to "Peregrine"
- cloud_session.py: add get_config_dir() returning per-user config path
  in cloud mode, repo config/ locally
- cloud_session.py: replace st.error() with JS redirect on missing/invalid
  session token so users land on login page instead of error screen
- Home.py, 4_Apply.py, migrate.py: remove remaining AIHawk UI references
2026-03-13 11:24:42 -07:00

171 lines
6 KiB
Python

# peregrine/app/cloud_session.py
"""
Cloud session middleware for multi-tenant Peregrine deployment.
In local-first mode (CLOUD_MODE unset or false), all functions are no-ops.
In cloud mode (CLOUD_MODE=true), resolves the Directus session JWT from the
X-CF-Session header, validates it, and injects user_id + db_path into
st.session_state.
All Peregrine pages call get_db_path() instead of DEFAULT_DB directly to
transparently support both local and cloud deployments.
"""
import logging
import os
import re
import hmac
import hashlib
from pathlib import Path
import requests
import streamlit as st
from scripts.db import DEFAULT_DB
log = logging.getLogger(__name__)
CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
CLOUD_DATA_ROOT: Path = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/menagerie-data"))
DIRECTUS_JWT_SECRET: str = os.environ.get("DIRECTUS_JWT_SECRET", "")
SERVER_SECRET: str = os.environ.get("CF_SERVER_SECRET", "")
# Heimdall license server — internal URL preferred when running on the same host
HEIMDALL_URL: str = os.environ.get("HEIMDALL_URL", "https://license.circuitforge.tech")
HEIMDALL_ADMIN_TOKEN: str = os.environ.get("HEIMDALL_ADMIN_TOKEN", "")
def _extract_session_token(cookie_header: str) -> str:
"""Extract cf_session value from a Cookie header string."""
m = re.search(r'(?:^|;)\s*cf_session=([^;]+)', cookie_header)
return m.group(1).strip() if m else ""
@st.cache_data(ttl=300, show_spinner=False)
def _fetch_cloud_tier(user_id: str, product: str) -> str:
"""Call Heimdall to resolve the current cloud tier for this user.
Cached per (user_id, product) for 5 minutes to avoid hammering Heimdall
on every Streamlit rerun. Returns "free" on any error so the app degrades
gracefully rather than blocking the user.
"""
if not HEIMDALL_ADMIN_TOKEN:
log.warning("HEIMDALL_ADMIN_TOKEN not set — defaulting tier to free")
return "free"
try:
resp = requests.post(
f"{HEIMDALL_URL}/admin/cloud/resolve",
json={"user_id": user_id, "product": product},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5,
)
if resp.status_code == 200:
return resp.json().get("tier", "free")
if resp.status_code == 404:
# No cloud key yet — user signed up before provision ran; return free.
return "free"
log.warning("Heimdall resolve returned %s — defaulting tier to free", resp.status_code)
except Exception as exc:
log.warning("Heimdall tier resolve failed: %s — defaulting to free", exc)
return "free"
def validate_session_jwt(token: str) -> str:
"""Validate a Directus session JWT and return the user UUID. Raises on failure."""
import jwt # PyJWT — lazy import so local mode never needs it
payload = jwt.decode(token, DIRECTUS_JWT_SECRET, algorithms=["HS256"])
user_id = payload.get("id") or payload.get("sub")
if not user_id:
raise ValueError("JWT missing user id claim")
return user_id
def _user_data_path(user_id: str, app: str) -> Path:
return CLOUD_DATA_ROOT / user_id / app
def derive_db_key(user_id: str) -> str:
"""Derive a per-user SQLCipher encryption key from the server secret."""
return hmac.new(
SERVER_SECRET.encode(),
user_id.encode(),
hashlib.sha256,
).hexdigest()
def resolve_session(app: str = "peregrine") -> None:
"""
Call at the top of each Streamlit page.
In local mode: no-op.
In cloud mode: reads X-CF-Session header, validates JWT, creates user
data directory on first visit, and sets st.session_state keys:
- user_id: str
- db_path: Path
- db_key: str (SQLCipher key for this user)
- cloud_tier: str (free | paid | premium | ultra — resolved from Heimdall)
Idempotent — skips if user_id already in session_state.
"""
if not CLOUD_MODE:
return
if st.session_state.get("user_id"):
return
cookie_header = st.context.headers.get("x-cf-session", "")
session_jwt = _extract_session_token(cookie_header)
if not session_jwt:
st.components.v1.html(
'<script>window.top.location.href = "https://circuitforge.tech/login";</script>',
height=0,
)
st.stop()
try:
user_id = validate_session_jwt(session_jwt)
except Exception:
st.components.v1.html(
'<script>window.top.location.href = "https://circuitforge.tech/login";</script>',
height=0,
)
st.stop()
user_path = _user_data_path(user_id, app)
user_path.mkdir(parents=True, exist_ok=True)
(user_path / "config").mkdir(exist_ok=True)
(user_path / "data").mkdir(exist_ok=True)
st.session_state["user_id"] = user_id
st.session_state["db_path"] = user_path / "staging.db"
st.session_state["db_key"] = derive_db_key(user_id)
st.session_state["cloud_tier"] = _fetch_cloud_tier(user_id, app)
def get_db_path() -> Path:
"""
Return the active db_path for this session.
Cloud: user-scoped path from session_state.
Local: DEFAULT_DB (from STAGING_DB env var or repo default).
"""
return st.session_state.get("db_path", DEFAULT_DB)
def get_config_dir() -> Path:
"""
Return the config directory for this session.
Cloud: per-user path (<data_root>/<user_id>/peregrine/config/) so each
user's YAML files (user.yaml, plain_text_resume.yaml, etc.) are
isolated and never shared across tenants.
Local: repo-level config/ directory.
"""
if CLOUD_MODE and st.session_state.get("db_path"):
return Path(st.session_state["db_path"]).parent / "config"
return Path(__file__).parent.parent.parent / "config"
def get_cloud_tier() -> str:
"""
Return the current user's cloud tier.
Cloud mode: resolved from Heimdall at session start (cached 5 min).
Local mode: always returns "local" so pages can distinguish self-hosted from cloud.
"""
if not CLOUD_MODE:
return "local"
return st.session_state.get("cloud_tier", "free")