diff --git a/app/cloud_session.py b/app/cloud_session.py new file mode 100644 index 0000000..871c36d --- /dev/null +++ b/app/cloud_session.py @@ -0,0 +1,51 @@ +""" +Avocet cloud session — thin wrapper around cf_core.cloud_session. + +Usage in FastAPI routes: + + from app.cloud_session import get_session, require_tier, CloudUser + from fastapi import Depends + + @router.get("/api/imitate") + def imitate(session: CloudUser = Depends(get_session)): + # session.user_id — Directus UUID (cloud) or "local" (self-hosted) + # session.tier — free | paid | premium | ultra | local + # session.has_byok — True if user has a configured LLM backend + ... + + @router.post("/api/custom-models") + def list_custom_models(session: CloudUser = Depends(require_tier("premium"))): + ... +""" +from __future__ import annotations + +import os +from pathlib import Path + +from circuitforge_core.cloud_session import CloudSessionFactory, CloudUser + +__all__ = ["CloudUser", "get_session", "require_tier"] + +_BYOK_CONFIG = Path.home() / ".config" / "circuitforge" / "llm.yaml" + + +def _detect_byok() -> bool: + try: + import yaml + with open(_BYOK_CONFIG) as f: + cfg = yaml.safe_load(f) or {} + return any( + b.get("enabled", True) and b.get("type") != "vision_service" + for b in cfg.get("backends", {}).values() + ) + except Exception: + return False + + +_factory = CloudSessionFactory( + product="avocet", + byok_detector=_detect_byok, +) + +get_session = _factory.dependency() +require_tier = _factory.require_tier diff --git a/app/imitate.py b/app/imitate.py index 58777f8..cec5685 100644 --- a/app/imitate.py +++ b/app/imitate.py @@ -24,7 +24,7 @@ from urllib.request import Request, urlopen import httpx import yaml -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel @@ -256,6 +256,7 @@ def _run_cftext( system: str, temperature: float, startup_timeout_s: float = 180.0, + user_id: str | None = None, ) -> tuple[str, int, bool]: """Allocate cf-text via cf-orch, generate, release. Returns (response, elapsed_ms, cold_started). @@ -273,6 +274,7 @@ def _run_cftext( "model_candidates": [model_id], "caller": "avocet", "pipeline": "imitate", + **({"user_id": user_id} if user_id else {}), }, timeout=30.0, ) @@ -460,6 +462,15 @@ def get_catalog() -> dict: # ── GET /run (SSE) ───────────────────────────────────────────────────────────── +def _get_imitate_session(request: Any, response: Any) -> "CloudUser | None": + """Optional session dependency — returns None when cloud_session is unavailable.""" + try: + from app.cloud_session import get_session + return get_session(request, response) + except Exception: + return None + + @router.get("/run") def run_imitate( prompt: str = "", @@ -469,6 +480,7 @@ def run_imitate( product_id: str = "", system: str = "", # optional system prompt image_url: str = "", # optional image URL for vision models + session: "Any" = Depends(_get_imitate_session), ) -> StreamingResponse: """Run a prompt through selected ollama models and stream results as SSE. @@ -534,9 +546,17 @@ def run_imitate( for model_id in cftext_ids: yield _sse({"type": "model_start", "model": model_id, "service": "cf-text"}) + _user_id: str | None = getattr(session, "user_id", None) + # Only forward real cloud user IDs — skip local/anon sessions + if _user_id in (None, "local", "local-dev") or (_user_id or "").startswith("anon-"): + _user_id = None + with ThreadPoolExecutor(max_workers=len(cftext_ids)) as pool: future_to_model = { - pool.submit(_run_cftext, cforch_base, mid, prompt, system_ctx, temperature): mid + pool.submit( + _run_cftext, cforch_base, mid, prompt, system_ctx, temperature, + 180.0, _user_id, + ): mid for mid in cftext_ids } for future in as_completed(future_to_model):