feat(cloud_session): add session resolution + forward user_id to cf-orch imitate

app/cloud_session.py:
- Thin wrapper around cf_core.cloud_session.CloudSessionFactory
- BYOK detection reads ~/.config/circuitforge/llm.yaml (same path as other products)
- get_session: FastAPI dependency, returns CloudUser (user_id, tier, has_byok)
- require_tier: dependency factory for tier-gated routes

app/imitate.py:
- _run_cftext gains user_id: str | None param; non-None values included in
  the cf-orch ServiceAllocateRequest so premium users get their custom models
- run_imitate injects session via Depends(_get_imitate_session); extracts user_id,
  filters out local/anon sessions (they get the shared catalog), passes real
  cloud user_id to the ThreadPoolExecutor fanout
- _get_imitate_session wraps get_session with a try/except so imitate keeps
  working in envs where cloud_session deps aren't installed
This commit is contained in:
pyr0ball 2026-04-24 16:41:45 -07:00
parent 5a0ba92fc6
commit 2891606765
2 changed files with 73 additions and 2 deletions

51
app/cloud_session.py Normal file
View file

@ -0,0 +1,51 @@
"""
Avocet cloud session thin wrapper around cf_core.cloud_session.
Usage in FastAPI routes:
from app.cloud_session import get_session, require_tier, CloudUser
from fastapi import Depends
@router.get("/api/imitate")
def imitate(session: CloudUser = Depends(get_session)):
# session.user_id — Directus UUID (cloud) or "local" (self-hosted)
# session.tier — free | paid | premium | ultra | local
# session.has_byok — True if user has a configured LLM backend
...
@router.post("/api/custom-models")
def list_custom_models(session: CloudUser = Depends(require_tier("premium"))):
...
"""
from __future__ import annotations
import os
from pathlib import Path
from circuitforge_core.cloud_session import CloudSessionFactory, CloudUser
__all__ = ["CloudUser", "get_session", "require_tier"]
_BYOK_CONFIG = Path.home() / ".config" / "circuitforge" / "llm.yaml"
def _detect_byok() -> bool:
try:
import yaml
with open(_BYOK_CONFIG) as f:
cfg = yaml.safe_load(f) or {}
return any(
b.get("enabled", True) and b.get("type") != "vision_service"
for b in cfg.get("backends", {}).values()
)
except Exception:
return False
_factory = CloudSessionFactory(
product="avocet",
byok_detector=_detect_byok,
)
get_session = _factory.dependency()
require_tier = _factory.require_tier

View file

@ -24,7 +24,7 @@ from urllib.request import Request, urlopen
import httpx import httpx
import yaml import yaml
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from pydantic import BaseModel from pydantic import BaseModel
@ -256,6 +256,7 @@ def _run_cftext(
system: str, system: str,
temperature: float, temperature: float,
startup_timeout_s: float = 180.0, startup_timeout_s: float = 180.0,
user_id: str | None = None,
) -> tuple[str, int, bool]: ) -> tuple[str, int, bool]:
"""Allocate cf-text via cf-orch, generate, release. Returns (response, elapsed_ms, cold_started). """Allocate cf-text via cf-orch, generate, release. Returns (response, elapsed_ms, cold_started).
@ -273,6 +274,7 @@ def _run_cftext(
"model_candidates": [model_id], "model_candidates": [model_id],
"caller": "avocet", "caller": "avocet",
"pipeline": "imitate", "pipeline": "imitate",
**({"user_id": user_id} if user_id else {}),
}, },
timeout=30.0, timeout=30.0,
) )
@ -460,6 +462,15 @@ def get_catalog() -> dict:
# ── GET /run (SSE) ───────────────────────────────────────────────────────────── # ── GET /run (SSE) ─────────────────────────────────────────────────────────────
def _get_imitate_session(request: Any, response: Any) -> "CloudUser | None":
"""Optional session dependency — returns None when cloud_session is unavailable."""
try:
from app.cloud_session import get_session
return get_session(request, response)
except Exception:
return None
@router.get("/run") @router.get("/run")
def run_imitate( def run_imitate(
prompt: str = "", prompt: str = "",
@ -469,6 +480,7 @@ def run_imitate(
product_id: str = "", product_id: str = "",
system: str = "", # optional system prompt system: str = "", # optional system prompt
image_url: str = "", # optional image URL for vision models image_url: str = "", # optional image URL for vision models
session: "Any" = Depends(_get_imitate_session),
) -> StreamingResponse: ) -> StreamingResponse:
"""Run a prompt through selected ollama models and stream results as SSE. """Run a prompt through selected ollama models and stream results as SSE.
@ -534,9 +546,17 @@ def run_imitate(
for model_id in cftext_ids: for model_id in cftext_ids:
yield _sse({"type": "model_start", "model": model_id, "service": "cf-text"}) yield _sse({"type": "model_start", "model": model_id, "service": "cf-text"})
_user_id: str | None = getattr(session, "user_id", None)
# Only forward real cloud user IDs — skip local/anon sessions
if _user_id in (None, "local", "local-dev") or (_user_id or "").startswith("anon-"):
_user_id = None
with ThreadPoolExecutor(max_workers=len(cftext_ids)) as pool: with ThreadPoolExecutor(max_workers=len(cftext_ids)) as pool:
future_to_model = { future_to_model = {
pool.submit(_run_cftext, cforch_base, mid, prompt, system_ctx, temperature): mid pool.submit(
_run_cftext, cforch_base, mid, prompt, system_ctx, temperature,
180.0, _user_id,
): mid
for mid in cftext_ids for mid in cftext_ids
} }
for future in as_completed(future_to_model): for future in as_completed(future_to_model):