feat: affiliates + preferences modules v0.7.0 (closes #21, #22) #25

Merged
pyr0ball merged 12 commits from feature/affiliates-module into main 2026-04-04 19:14:25 -07:00
24 changed files with 1518 additions and 9 deletions

View file

@ -0,0 +1,60 @@
name: Build and publish cf-orch Docker image
on:
push:
tags:
- "v*"
workflow_dispatch:
env:
REGISTRY: ghcr.io
IMAGE_NAME: circuit-forge/cf-orch
jobs:
build-and-push:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Extract version from tag
id: meta
run: |
TAG="${GITHUB_REF_NAME}"
echo "tag=${TAG}" >> "$GITHUB_OUTPUT"
echo "image=${REGISTRY}/${IMAGE_NAME}" >> "$GITHUB_OUTPUT"
- name: Log in to GHCR
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile.orch
push: true
tags: |
${{ steps.meta.outputs.image }}:latest
${{ steps.meta.outputs.image }}:${{ steps.meta.outputs.tag }}
cache-from: type=gha
cache-to: type=gha,mode=max
labels: |
org.opencontainers.image.version=${{ steps.meta.outputs.tag }}
org.opencontainers.image.revision=${{ github.sha }}
- name: Summary
run: |
echo "### Published" >> "$GITHUB_STEP_SUMMARY"
echo "- \`${{ steps.meta.outputs.image }}:latest\`" >> "$GITHUB_STEP_SUMMARY"
echo "- \`${{ steps.meta.outputs.image }}:${{ steps.meta.outputs.tag }}\`" >> "$GITHUB_STEP_SUMMARY"

View file

@ -6,6 +6,25 @@ Versions follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
---
## [0.7.0] — 2026-04-04
### Added
**`circuitforge_core.affiliates`** — affiliate link wrapping module (closes #21)
- `wrap_url(url, retailer, user_id, get_preference)` — resolution order: opt-out → BYOK → CF env var → plain URL
- `AffiliateProgram` frozen dataclass + `register_program()` / `get_program()` registry
- Built-in programs: eBay Partner Network (`EBAY_AFFILIATE_CAMPAIGN_ID`), Amazon Associates (`AMAZON_ASSOCIATES_TAG`)
- `get_disclosure_text(retailer)` — per-retailer tooltip copy + `BANNER_COPY` first-encounter constants
- `get_preference` callable injection for opt-out + BYOK without hard-wiring a storage backend
**`circuitforge_core.preferences`** — preference persistence helpers (closes #22 self-hosted path)
- `LocalFileStore` — YAML-backed single-user preference store (`~/.config/circuitforge/preferences.yaml`)
- `get_user_preference(user_id, path, default, store)` + `set_user_preference(user_id, path, value, store)`
- `PreferenceStore` protocol — Heimdall cloud backend to follow once Heimdall#5 lands
- Dot-path utilities `get_path` / `set_path` (immutable nested dict read/write)
---
## [0.5.0] — 2026-04-02
### Added

53
Dockerfile.orch Normal file
View file

@ -0,0 +1,53 @@
# cf-orch coordinator image
# Includes the coordinator + agent; designed for paid+ multi-node deployments.
#
# Usage (coordinator node):
# docker run -d \
# -p 7700:7700 \
# -e HEIMDALL_URL=https://license.circuitforge.tech \
# -e HEIMDALL_MIN_TIER=paid \
# -e CF_ORCH_AUTH_SECRET=<secret> \
# ghcr.io/circuit-forge/cf-orch:latest coordinator
#
# Usage (GPU agent node — connects back to coordinator):
# docker run -d \
# --gpus all \
# -e CF_COORDINATOR_URL=http://<coordinator-ip>:7700 \
# ghcr.io/circuit-forge/cf-orch:latest agent
#
# Environment variables
# ─────────────────────
# CF_ORCH_PORT Coordinator listen port (default: 7700)
# HEIMDALL_URL Enable license auth (omit for LAN-only / self-hosted)
# HEIMDALL_MIN_TIER Minimum tier required (default: paid)
# CF_ORCH_AUTH_SECRET Shared secret with Heimdall /licenses/verify
# CF_COORDINATOR_URL Agent mode: coordinator URL to register with
# CF_AGENT_GPU_IDS Comma-separated GPU indices for agent (default: 0)
FROM python:3.12-slim
LABEL org.opencontainers.image.source="https://git.opensourcesolarpunk.com/Circuit-Forge/circuitforge-core"
LABEL org.opencontainers.image.description="cf-orch coordinator and agent for CircuitForge multi-node GPU orchestration"
LABEL org.opencontainers.image.licenses="BSL-1.1"
WORKDIR /app
# System deps — httpx needs curl for connection reuse; avoid full dev toolchain
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install cf-core with the resources extra (coordinator + agent deps)
COPY pyproject.toml README.md ./
COPY circuitforge_core/ ./circuitforge_core/
RUN pip install --no-cache-dir ".[resources,manage]"
ENV CF_ORCH_PORT=7700
EXPOSE 7700
COPY docker/orch-entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
CMD ["coordinator"]

View file

@ -1 +1 @@
__version__ = "0.1.0"
__version__ = "0.7.0"

View file

@ -0,0 +1,41 @@
"""Public API for circuitforge_core.affiliates.
Usage::
from circuitforge_core.affiliates import wrap_url, get_disclosure_text
# Wrap a URL — env-var mode (no preferences, no opt-out)
url = wrap_url("https://www.ebay.com/itm/123", retailer="ebay")
# Wrap a URL — with preference injection (opt-out + BYOK)
url = wrap_url(
"https://www.ebay.com/itm/123",
retailer="ebay",
user_id="u123",
get_preference=my_prefs_client.get,
)
# Frontend disclosure tooltip
text = get_disclosure_text("ebay")
# Register a product-specific program at startup
register_program(AffiliateProgram(
name="My Shop",
retailer_key="myshop",
env_var="MYSHOP_AFFILIATE_ID",
build_url=lambda url, id_: f"{url}?ref={id_}",
))
"""
from .disclosure import BANNER_COPY, get_disclosure_text
from .programs import AffiliateProgram, get_program, register_program, registered_keys
from .router import wrap_url
__all__ = [
"wrap_url",
"get_disclosure_text",
"BANNER_COPY",
"AffiliateProgram",
"register_program",
"get_program",
"registered_keys",
]

View file

@ -0,0 +1,49 @@
"""Affiliate disclosure copy constants.
Follows the plain-language disclosure design from the affiliate links design
doc. All copy is centralized here so products don't drift out of sync and
legal/copy review has a single file to audit.
"""
from __future__ import annotations
# Per-retailer tooltip copy (shown on hover/tap of affiliate link indicator)
_TOOLTIP: dict[str, str] = {
"ebay": (
"Affiliate link — CircuitForge earns a small commission if you purchase "
"on eBay. No purchase data is shared with us. [Opt out in Settings]"
),
"amazon": (
"Affiliate link — CircuitForge earns a small commission if you purchase "
"on Amazon. No purchase data is shared with us. [Opt out in Settings]"
),
}
_GENERIC_TOOLTIP = (
"Affiliate link — CircuitForge may earn a small commission if you purchase. "
"No purchase data is shared with us. [Opt out in Settings]"
)
# First-encounter banner copy (shown once, then preference saved)
BANNER_COPY: dict[str, str] = {
"title": "A note on purchase links",
"body": (
"Some links in this product go to retailers using our affiliate code. "
"When you click one, the retailer knows you came from CircuitForge. "
"We don't see or store what you buy. The retailer may track your "
"purchase — that's between you and them.\n\n"
"If you'd rather use plain links with no tracking code, you can opt "
"out in Settings."
),
"dismiss_label": "Got it",
"opt_out_label": "Opt out now",
"learn_more_label": "Learn more",
}
def get_disclosure_text(retailer: str) -> str:
"""Return the tooltip disclosure string for *retailer*.
Falls back to a generic string for unregistered retailers so callers
never receive an empty string.
"""
return _TOOLTIP.get(retailer, _GENERIC_TOOLTIP)

View file

@ -0,0 +1,103 @@
"""Affiliate program definitions and URL builders.
Each ``AffiliateProgram`` knows how to append its affiliate parameters to a
plain product URL. Built-in programs (eBay EPN, Amazon Associates) are
registered at module import time. Products can register additional programs
with ``register_program()``.
Affiliate IDs are read from environment variables at call time so they pick
up values set after process startup (useful in tests).
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Callable
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
@dataclass(frozen=True)
class AffiliateProgram:
"""One affiliate program and its URL building logic.
Attributes:
name: Human-readable program name.
retailer_key: Matches the ``retailer=`` argument in ``wrap_url()``.
env_var: Environment variable holding CF's affiliate ID.
build_url: ``(plain_url, affiliate_id) -> affiliate_url`` callable.
"""
name: str
retailer_key: str
env_var: str
build_url: Callable[[str, str], str]
def cf_affiliate_id(self) -> str | None:
"""Return CF's configured affiliate ID, or None if the env var is unset/blank."""
val = os.environ.get(self.env_var, "").strip()
return val or None
# ---------------------------------------------------------------------------
# URL builders
# ---------------------------------------------------------------------------
def _build_ebay_url(url: str, affiliate_id: str) -> str:
"""Append eBay Partner Network parameters to a listing URL."""
sep = "&" if "?" in url else "?"
params = urlencode({
"mkcid": "1",
"mkrid": "711-53200-19255-0",
"siteid": "0",
"campid": affiliate_id,
"toolid": "10001",
"mkevt": "1",
})
return f"{url}{sep}{params}"
def _build_amazon_url(url: str, affiliate_id: str) -> str:
"""Merge an Amazon Associates tag into a product URL's query string."""
parsed = urlparse(url)
qs = parse_qs(parsed.query, keep_blank_values=True)
qs["tag"] = [affiliate_id]
new_query = urlencode({k: v[0] for k, v in qs.items()})
return urlunparse(parsed._replace(query=new_query))
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
_REGISTRY: dict[str, AffiliateProgram] = {}
def register_program(program: AffiliateProgram) -> None:
"""Register an affiliate program (overwrites any existing entry for the same key)."""
_REGISTRY[program.retailer_key] = program
def get_program(retailer_key: str) -> AffiliateProgram | None:
"""Return the registered program for *retailer_key*, or None."""
return _REGISTRY.get(retailer_key)
def registered_keys() -> list[str]:
"""Return all currently registered retailer keys."""
return list(_REGISTRY.keys())
# Register built-ins
register_program(AffiliateProgram(
name="eBay Partner Network",
retailer_key="ebay",
env_var="EBAY_AFFILIATE_CAMPAIGN_ID",
build_url=_build_ebay_url,
))
register_program(AffiliateProgram(
name="Amazon Associates",
retailer_key="amazon",
env_var="AMAZON_ASSOCIATES_TAG",
build_url=_build_amazon_url,
))

View file

@ -0,0 +1,83 @@
"""Affiliate URL wrapping — resolution logic.
Resolution order (from affiliate links design doc):
1. User opted out? return plain URL
2. User has BYOK ID for this retailer? wrap with user's ID
3. CF has a program with env var set? wrap with CF's ID
4. No program / no ID configured return plain URL
The ``get_preference`` callable is optional. When None (default), steps 1
and 2 are skipped the module operates in env-var-only mode. Products
inject their preferences client to enable opt-out and BYOK.
Signature of ``get_preference``::
def get_preference(user_id: str | None, path: str, default=None) -> Any: ...
"""
from __future__ import annotations
import logging
from typing import Any, Callable
from .programs import get_program
logger = logging.getLogger(__name__)
GetPreferenceFn = Callable[[str | None, str, Any], Any]
def wrap_url(
url: str,
retailer: str,
user_id: str | None = None,
get_preference: GetPreferenceFn | None = None,
) -> str:
"""Return an affiliate URL for *url*, or the plain URL if no affiliate
link can be or should be generated.
Args:
url: Plain product URL to wrap.
retailer: Retailer key (e.g. ``"ebay"``, ``"amazon"``).
user_id: User identifier for preference lookups. None = anonymous.
get_preference: Optional callable ``(user_id, path, default) -> value``.
Injected by products to enable opt-out and BYOK resolution.
When None, opt-out and BYOK checks are skipped.
Returns:
Affiliate URL, or *url* unchanged if:
- The user has opted out
- No program is registered for *retailer*
- No affiliate ID is configured (env var unset and no BYOK)
"""
program = get_program(retailer)
if program is None:
logger.debug("affiliates: no program registered for retailer=%r", retailer)
return url
# Step 1: opt-out check
if get_preference is not None:
opted_out = get_preference(user_id, "affiliate.opt_out", False)
if opted_out:
logger.debug("affiliates: user %r opted out — returning plain URL", user_id)
return url
# Step 2: BYOK — user's own affiliate ID (Premium)
if get_preference is not None and user_id is not None:
byok_id = get_preference(user_id, f"affiliate.byok_ids.{retailer}", None)
if byok_id:
logger.debug(
"affiliates: using BYOK id for user=%r retailer=%r", user_id, retailer
)
return program.build_url(url, byok_id)
# Step 3: CF's affiliate ID from env var
cf_id = program.cf_affiliate_id()
if cf_id:
return program.build_url(url, cf_id)
logger.debug(
"affiliates: no affiliate ID configured for retailer=%r (env var %r unset)",
retailer, program.env_var,
)
return url

View file

@ -17,13 +17,80 @@ CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml"
class LLMRouter:
def __init__(self, config_path: Path = CONFIG_PATH):
if not config_path.exists():
raise FileNotFoundError(
f"{config_path} not found. "
"Copy the llm.yaml.example to ~/.config/circuitforge/llm.yaml and configure your LLM backends."
if config_path.exists():
with open(config_path) as f:
self.config = yaml.safe_load(f)
else:
env_config = self._auto_config_from_env()
if env_config is None:
raise FileNotFoundError(
f"{config_path} not found and no LLM env vars detected. "
"Either copy llm.yaml.example to ~/.config/circuitforge/llm.yaml, "
"or set ANTHROPIC_API_KEY, OPENAI_API_KEY, or OLLAMA_HOST."
)
logger.info(
"[LLMRouter] No llm.yaml found — using env-var auto-config "
"(backends: %s)", ", ".join(env_config["fallback_order"])
)
with open(config_path) as f:
self.config = yaml.safe_load(f)
self.config = env_config
@staticmethod
def _auto_config_from_env() -> dict | None:
"""Build a minimal LLM config from well-known environment variables.
Priority order (highest to lowest):
1. ANTHROPIC_API_KEY anthropic backend
2. OPENAI_API_KEY openai-compat api.openai.com (or OPENAI_BASE_URL)
3. OLLAMA_HOST openai-compat local Ollama (always included as last resort)
Returns None only when none of these are set and Ollama is not configured,
so the caller can decide whether to raise or surface a user-facing message.
"""
backends: dict = {}
fallback_order: list[str] = []
if os.environ.get("ANTHROPIC_API_KEY"):
backends["anthropic"] = {
"type": "anthropic",
"enabled": True,
"model": os.environ.get("ANTHROPIC_MODEL", "claude-haiku-4-5-20251001"),
"api_key_env": "ANTHROPIC_API_KEY",
"supports_images": True,
}
fallback_order.append("anthropic")
if os.environ.get("OPENAI_API_KEY"):
backends["openai"] = {
"type": "openai_compat",
"enabled": True,
"base_url": os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1"),
"model": os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
"api_key": os.environ.get("OPENAI_API_KEY"),
"supports_images": True,
}
fallback_order.append("openai")
# Ollama — always added when any config exists, as the lowest-cost local fallback.
# Unreachable Ollama is harmless — _is_reachable() skips it gracefully.
ollama_host = os.environ.get("OLLAMA_HOST", "http://localhost:11434")
if not ollama_host.startswith("http"):
ollama_host = f"http://{ollama_host}"
backends["ollama"] = {
"type": "openai_compat",
"enabled": True,
"base_url": ollama_host.rstrip("/") + "/v1",
"model": os.environ.get("OLLAMA_MODEL", "llama3.2:3b"),
"api_key": "any",
"supports_images": False,
}
fallback_order.append("ollama")
# Return None if only ollama is in the list AND no explicit host was set —
# that means the user set nothing at all, not even OLLAMA_HOST.
if fallback_order == ["ollama"] and "OLLAMA_HOST" not in os.environ:
return None
return {"backends": backends, "fallback_order": fallback_order}
def _is_reachable(self, base_url: str) -> bool:
"""Quick health-check ping. Returns True if backend is up."""

View file

@ -0,0 +1,47 @@
from . import store as store_module
from .paths import get_path, set_path
from .store import LocalFileStore, PreferenceStore
def get_user_preference(
user_id: str | None,
path: str,
default=None,
store: PreferenceStore | None = None,
):
"""Read a preference value at dot-separated *path*.
Args:
user_id: User identifier (passed to store; local store ignores it).
path: Dot-separated preference path, e.g. ``"affiliate.opt_out"``.
default: Returned when the path is not set.
store: Optional store override; defaults to ``LocalFileStore`` at
``~/.config/circuitforge/preferences.yaml``.
"""
s = store or store_module._DEFAULT_STORE
return s.get(user_id=user_id, path=path, default=default)
def set_user_preference(
user_id: str | None,
path: str,
value,
store: PreferenceStore | None = None,
) -> None:
"""Write *value* at dot-separated *path*.
Args:
user_id: User identifier (passed to store; local store ignores it).
path: Dot-separated preference path, e.g. ``"affiliate.byok_ids.ebay"``.
value: Value to persist.
store: Optional store override; defaults to ``LocalFileStore``.
"""
s = store or store_module._DEFAULT_STORE
s.set(user_id=user_id, path=path, value=value)
__all__ = [
"get_path", "set_path",
"get_user_preference", "set_user_preference",
"LocalFileStore", "PreferenceStore",
]

View file

@ -0,0 +1,64 @@
"""Dot-path utilities for reading and writing nested preference dicts.
All operations are immutable: set_path returns a new dict rather than
mutating the input.
Path format: dot-separated keys, e.g. "affiliate.byok_ids.ebay"
"""
from __future__ import annotations
from typing import Any
def get_path(data: dict, path: str, default: Any = None) -> Any:
"""Return the value at *path* inside *data*, or *default* if missing.
Example::
prefs = {"affiliate": {"opt_out": False, "byok_ids": {"ebay": "my-id"}}}
get_path(prefs, "affiliate.byok_ids.ebay") # "my-id"
get_path(prefs, "affiliate.missing", default="x") # "x"
"""
keys = path.split(".")
node: Any = data
for key in keys:
if not isinstance(node, dict):
return default
node = node.get(key, _SENTINEL)
if node is _SENTINEL:
return default
return node
def set_path(data: dict, path: str, value: Any) -> dict:
"""Return a new dict with *value* written at *path*.
Intermediate dicts are created as needed; existing values at other paths
are preserved. The original *data* dict is never mutated.
Example::
prefs = {}
updated = set_path(prefs, "affiliate.opt_out", True)
# {"affiliate": {"opt_out": True}}
"""
keys = path.split(".")
return _set_recursive(data, keys, value)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
_SENTINEL = object()
def _set_recursive(node: Any, keys: list[str], value: Any) -> dict:
if not isinstance(node, dict):
node = {}
key, rest = keys[0], keys[1:]
if rest:
child = _set_recursive(node.get(key, {}), rest, value)
else:
child = value
return {**node, key: child}

View file

@ -0,0 +1,78 @@
"""Preference store backends.
``LocalFileStore`` reads and writes a single YAML file at a configurable
path (default: ``~/.config/circuitforge/preferences.yaml``).
The ``PreferenceStore`` protocol describes the interface any backend must
satisfy. The Heimdall cloud backend will implement the same protocol once
Heimdall#5 (user_preferences column) lands — products swap backends by
passing a different store instance.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
from .paths import get_path, set_path
logger = logging.getLogger(__name__)
_DEFAULT_PREFS_PATH = Path.home() / ".config" / "circuitforge" / "preferences.yaml"
@runtime_checkable
class PreferenceStore(Protocol):
"""Read/write interface for user preferences.
``user_id`` is passed through for cloud backends that store per-user
data. Local single-user backends accept it but ignore it.
"""
def get(self, user_id: str | None, path: str, default: Any = None) -> Any:
"""Return the value at *path*, or *default* if missing."""
...
def set(self, user_id: str | None, path: str, value: Any) -> None:
"""Persist *value* at *path*."""
...
class LocalFileStore:
"""Single-user preference store backed by a YAML file.
Thread-safe for typical single-process use (reads the file on every
``get`` call, writes atomically via a temp-file rename on ``set``).
Not suitable for concurrent multi-process writes.
"""
def __init__(self, prefs_path: Path = _DEFAULT_PREFS_PATH) -> None:
self._path = Path(prefs_path)
def _load(self) -> dict:
if not self._path.exists():
return {}
try:
import yaml # type: ignore[import]
text = self._path.read_text(encoding="utf-8")
data = yaml.safe_load(text)
return data if isinstance(data, dict) else {}
except Exception as exc:
logger.warning("preferences: could not read %s: %s", self._path, exc)
return {}
def _save(self, data: dict) -> None:
import yaml # type: ignore[import]
self._path.parent.mkdir(parents=True, exist_ok=True)
tmp = self._path.with_suffix(".yaml.tmp")
tmp.write_text(yaml.safe_dump(data, default_flow_style=False), encoding="utf-8")
tmp.replace(self._path)
def get(self, user_id: str | None, path: str, default: Any = None) -> Any: # noqa: ARG002
return get_path(self._load(), path, default=default)
def set(self, user_id: str | None, path: str, value: Any) -> None: # noqa: ARG002
self._save(set_path(self._load(), path, value))
_DEFAULT_STORE: PreferenceStore = LocalFileStore()

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import logging
import os
from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass
@ -34,13 +35,25 @@ class CFOrchClient:
async with client.allocate_async("vllm", model_candidates=["Ouro-1.4B"]) as alloc:
...
Authentication:
Pass api_key explicitly, or set CF_LICENSE_KEY env var. When set, every
request carries Authorization: Bearer <key>. Required for the hosted
CircuitForge coordinator (orch.circuitforge.tech); optional for local
self-hosted coordinators.
Raises ValueError immediately if coordinator_url is empty.
"""
def __init__(self, coordinator_url: str) -> None:
def __init__(self, coordinator_url: str, api_key: str | None = None) -> None:
if not coordinator_url:
raise ValueError("coordinator_url is empty — cf-orch not configured")
self._url = coordinator_url.rstrip("/")
self._api_key = api_key or os.environ.get("CF_LICENSE_KEY", "")
def _headers(self) -> dict[str, str]:
if self._api_key:
return {"Authorization": f"Bearer {self._api_key}"}
return {}
def _build_body(self, model_candidates: list[str] | None, ttl_s: float, caller: str) -> dict:
return {
@ -74,6 +87,7 @@ class CFOrchClient:
resp = httpx.post(
f"{self._url}/api/services/{service}/allocate",
json=self._build_body(model_candidates, ttl_s, caller),
headers=self._headers(),
timeout=120.0,
)
if not resp.is_success:
@ -88,6 +102,7 @@ class CFOrchClient:
try:
httpx.delete(
f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}",
headers=self._headers(),
timeout=10.0,
)
except Exception as exc:
@ -107,6 +122,7 @@ class CFOrchClient:
resp = await client.post(
f"{self._url}/api/services/{service}/allocate",
json=self._build_body(model_candidates, ttl_s, caller),
headers=self._headers(),
)
if not resp.is_success:
raise RuntimeError(
@ -120,6 +136,7 @@ class CFOrchClient:
try:
await client.delete(
f"{self._url}/api/services/{service}/allocations/{alloc.allocation_id}",
headers=self._headers(),
timeout=10.0,
)
except Exception as exc:

View file

@ -133,6 +133,14 @@ def create_coordinator_app(
app = FastAPI(title="cf-orch-coordinator", lifespan=_lifespan)
# Optional Heimdall auth — enabled when HEIMDALL_URL env var is set.
# Self-hosted coordinators skip this entirely; the CF-hosted public endpoint
# (orch.circuitforge.tech) sets HEIMDALL_URL to gate paid+ access.
from circuitforge_core.resources.coordinator.auth import HeimdallAuthMiddleware
_auth = HeimdallAuthMiddleware.from_env()
if _auth is not None:
app.middleware("http")(_auth)
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
def dashboard() -> HTMLResponse:
return HTMLResponse(content=_DASHBOARD_HTML)

View file

@ -0,0 +1,197 @@
"""
cf-orch coordinator auth middleware.
When HEIMDALL_URL is set, all /api/* requests (except /api/health) must carry:
Authorization: Bearer <CF license key>
The key is validated against Heimdall and the result cached for
CACHE_TTL_S seconds (default 300 / 5 min). This keeps Heimdall out of the
per-allocation hot path while keeping revocation latency bounded.
When HEIMDALL_URL is not set, auth is disabled self-hosted deployments work
with no configuration change.
Environment variables
---------------------
HEIMDALL_URL Heimdall base URL, e.g. https://license.circuitforge.tech
When absent, auth is skipped entirely.
HEIMDALL_MIN_TIER Minimum tier required (default: "paid").
Accepted values: free, paid, premium, ultra.
CF_ORCH_AUTH_SECRET Shared secret sent to Heimdall so it can distinguish
coordinator service calls from end-user requests.
Must match the COORDINATOR_SECRET env var on Heimdall.
"""
from __future__ import annotations
import logging
import os
import time
from dataclasses import dataclass, field
from threading import Lock
import httpx
from fastapi import Request
from fastapi.responses import JSONResponse
logger = logging.getLogger(__name__)
# Unauthenticated paths — health check must always be accessible for monitoring.
_EXEMPT_PATHS: frozenset[str] = frozenset({"/api/health", "/", "/openapi.json", "/docs", "/redoc"})
_TIER_ORDER: dict[str, int] = {"free": 0, "paid": 1, "premium": 2, "ultra": 3}
CACHE_TTL_S: float = 300.0 # 5 minutes — matches Kiwi cloud session TTL
@dataclass
class _CacheEntry:
valid: bool
tier: str
user_id: str
expires_at: float
class _ValidationCache:
"""Thread-safe TTL cache for Heimdall validation results."""
def __init__(self, ttl_s: float = CACHE_TTL_S) -> None:
self._ttl = ttl_s
self._store: dict[str, _CacheEntry] = {}
self._lock = Lock()
def get(self, key: str) -> _CacheEntry | None:
with self._lock:
entry = self._store.get(key)
if entry is None or time.monotonic() > entry.expires_at:
return None
return entry
def set(self, key: str, valid: bool, tier: str, user_id: str) -> None:
with self._lock:
self._store[key] = _CacheEntry(
valid=valid,
tier=tier,
user_id=user_id,
expires_at=time.monotonic() + self._ttl,
)
def evict(self, key: str) -> None:
with self._lock:
self._store.pop(key, None)
def prune(self) -> int:
"""Remove expired entries. Returns count removed."""
now = time.monotonic()
with self._lock:
expired = [k for k, e in self._store.items() if now > e.expires_at]
for k in expired:
del self._store[k]
return len(expired)
class HeimdallAuthMiddleware:
"""
ASGI middleware that validates CF license keys against Heimdall.
Attach to a FastAPI app via app.middleware("http"):
middleware = HeimdallAuthMiddleware.from_env()
if middleware:
app.middleware("http")(middleware)
"""
def __init__(
self,
heimdall_url: str,
min_tier: str = "paid",
auth_secret: str = "",
cache_ttl_s: float = CACHE_TTL_S,
) -> None:
self._heimdall = heimdall_url.rstrip("/")
self._min_tier_rank = _TIER_ORDER.get(min_tier, 1)
self._min_tier = min_tier
self._auth_secret = auth_secret
self._cache = _ValidationCache(ttl_s=cache_ttl_s)
logger.info(
"[cf-orch auth] Heimdall auth enabled — url=%s min_tier=%s ttl=%ss",
self._heimdall, min_tier, cache_ttl_s,
)
@classmethod
def from_env(cls) -> "HeimdallAuthMiddleware | None":
"""Return a configured middleware instance, or None if HEIMDALL_URL is not set."""
url = os.environ.get("HEIMDALL_URL", "")
if not url:
logger.info("[cf-orch auth] HEIMDALL_URL not set — auth disabled (self-hosted mode)")
return None
return cls(
heimdall_url=url,
min_tier=os.environ.get("HEIMDALL_MIN_TIER", "paid"),
auth_secret=os.environ.get("CF_ORCH_AUTH_SECRET", ""),
)
def _validate_against_heimdall(self, license_key: str) -> tuple[bool, str, str]:
"""
Call Heimdall's /licenses/verify endpoint.
Returns (valid, tier, user_id).
On any network or parse error, returns (False, "", "") fail closed.
"""
try:
headers: dict[str, str] = {"Content-Type": "application/json"}
if self._auth_secret:
headers["X-Coordinator-Secret"] = self._auth_secret
resp = httpx.post(
f"{self._heimdall}/licenses/verify",
json={"key": license_key, "min_tier": self._min_tier},
headers=headers,
timeout=5.0,
)
if resp.status_code == 200:
data = resp.json()
return data.get("valid", False), data.get("tier", ""), data.get("user_id", "")
# 401/403 from Heimdall = key invalid/insufficient tier
logger.debug("[cf-orch auth] Heimdall returned %s for key ...%s", resp.status_code, license_key[-6:])
return False, "", ""
except Exception as exc:
logger.warning("[cf-orch auth] Heimdall unreachable — failing closed: %s", exc)
return False, "", ""
def _check_key(self, license_key: str) -> tuple[bool, str]:
"""
Validate key (cache-first). Returns (authorized, reason_if_denied).
"""
cached = self._cache.get(license_key)
if cached is not None:
if not cached.valid:
return False, "license key invalid or expired"
if _TIER_ORDER.get(cached.tier, -1) < self._min_tier_rank:
return False, f"feature requires {self._min_tier} tier (have: {cached.tier})"
return True, ""
valid, tier, user_id = self._validate_against_heimdall(license_key)
self._cache.set(license_key, valid=valid, tier=tier, user_id=user_id)
if not valid:
return False, "license key invalid or expired"
if _TIER_ORDER.get(tier, -1) < self._min_tier_rank:
return False, f"feature requires {self._min_tier} tier (have: {tier})"
return True, ""
async def __call__(self, request: Request, call_next): # type: ignore[no-untyped-def]
if request.url.path in _EXEMPT_PATHS:
return await call_next(request)
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return JSONResponse(
status_code=401,
content={"detail": "Authorization: Bearer <license_key> required"},
)
license_key = auth_header.removeprefix("Bearer ").strip()
authorized, reason = self._check_key(license_key)
if not authorized:
return JSONResponse(status_code=403, content={"detail": reason})
return await call_next(request)

25
docker/orch-entrypoint.sh Normal file
View file

@ -0,0 +1,25 @@
#!/bin/bash
set -e
MODE="${1:-coordinator}"
PORT="${CF_ORCH_PORT:-7700}"
case "$MODE" in
coordinator)
echo "[cf-orch] Starting coordinator on port $PORT"
exec python -m circuitforge_core.resources.cli coordinator \
--host 0.0.0.0 --port "$PORT"
;;
agent)
COORDINATOR="${CF_COORDINATOR_URL:?CF_COORDINATOR_URL must be set for agent mode}"
GPU_IDS="${CF_AGENT_GPU_IDS:-0}"
echo "[cf-orch] Starting agent — coordinator=$COORDINATOR gpu_ids=$GPU_IDS"
exec python -m circuitforge_core.resources.cli agent \
--coordinator "$COORDINATOR" \
--gpu-ids "$GPU_IDS"
;;
*)
echo "Usage: cf-orch [coordinator|agent]"
exit 1
;;
esac

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "circuitforge-core"
version = "0.5.0"
version = "0.7.0"
description = "Shared scaffold for CircuitForge products"
requires-python = ">=3.11"
dependencies = [

View file

View file

@ -0,0 +1,29 @@
"""Tests for affiliate disclosure text."""
import pytest
from circuitforge_core.affiliates.disclosure import get_disclosure_text, BANNER_COPY
class TestGetDisclosureText:
def test_returns_string_for_known_retailer(self):
text = get_disclosure_text("ebay")
assert isinstance(text, str)
assert len(text) > 0
def test_ebay_copy_mentions_ebay(self):
text = get_disclosure_text("ebay")
assert "eBay" in text
def test_amazon_copy_mentions_amazon(self):
text = get_disclosure_text("amazon")
assert "Amazon" in text
def test_unknown_retailer_returns_generic(self):
text = get_disclosure_text("not_a_retailer")
assert isinstance(text, str)
assert len(text) > 0
def test_banner_copy_has_required_keys(self):
assert "title" in BANNER_COPY
assert "body" in BANNER_COPY
assert "opt_out_label" in BANNER_COPY
assert "dismiss_label" in BANNER_COPY

View file

@ -0,0 +1,60 @@
"""Integration tests — full wrap_url() round-trip through public API."""
import pytest
from circuitforge_core.affiliates import wrap_url, get_disclosure_text
class TestEbayIntegration:
def test_full_flow_with_env_var(self, monkeypatch):
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "cf-snipe-999")
url = wrap_url("https://www.ebay.com/itm/987654321", retailer="ebay")
assert "campid=cf-snipe-999" in url
assert "mkcid=1" in url
assert "mkevt=1" in url
def test_full_flow_with_opt_out(self, monkeypatch):
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "cf-snipe-999")
def get_pref(user_id, path, default=None):
if path == "affiliate.opt_out":
return True
return default
result = wrap_url(
"https://www.ebay.com/itm/987654321",
retailer="ebay",
user_id="u99",
get_preference=get_pref,
)
assert result == "https://www.ebay.com/itm/987654321"
def test_disclosure_text_available(self):
text = get_disclosure_text("ebay")
assert "eBay" in text
assert len(text) > 20
class TestAmazonIntegration:
def test_full_flow_with_env_var(self, monkeypatch):
monkeypatch.setenv("AMAZON_ASSOCIATES_TAG", "cf-kiwi-20")
url = wrap_url("https://www.amazon.com/dp/B00TEST1234", retailer="amazon")
assert "tag=cf-kiwi-20" in url
def test_preserves_existing_query_params(self, monkeypatch):
monkeypatch.setenv("AMAZON_ASSOCIATES_TAG", "cf-kiwi-20")
url = wrap_url(
"https://www.amazon.com/dp/B00TEST?ref=sr_1_1&keywords=flour",
retailer="amazon",
)
assert "tag=cf-kiwi-20" in url
assert "ref=sr_1_1" in url
assert "keywords=flour" in url
class TestNoEnvVar:
def test_plain_url_returned_when_no_env_var(self, monkeypatch):
monkeypatch.delenv("EBAY_AFFILIATE_CAMPAIGN_ID", raising=False)
monkeypatch.delenv("AMAZON_ASSOCIATES_TAG", raising=False)
ebay_url = "https://www.ebay.com/itm/1"
amazon_url = "https://www.amazon.com/dp/B001"
assert wrap_url(ebay_url, retailer="ebay") == ebay_url
assert wrap_url(amazon_url, retailer="amazon") == amazon_url

View file

@ -0,0 +1,99 @@
"""Tests for affiliate program registry and URL builders."""
import pytest
from circuitforge_core.affiliates.programs import (
AffiliateProgram,
get_program,
register_program,
registered_keys,
_build_ebay_url,
_build_amazon_url,
)
class TestAffiliateProgram:
def test_cf_affiliate_id_returns_env_value(self, monkeypatch):
monkeypatch.setenv("TEST_AFF_ID", "my-id")
prog = AffiliateProgram(
name="Test", retailer_key="test",
env_var="TEST_AFF_ID", build_url=lambda u, i: u
)
assert prog.cf_affiliate_id() == "my-id"
def test_cf_affiliate_id_returns_none_when_unset(self, monkeypatch):
monkeypatch.delenv("TEST_AFF_ID", raising=False)
prog = AffiliateProgram(
name="Test", retailer_key="test",
env_var="TEST_AFF_ID", build_url=lambda u, i: u
)
assert prog.cf_affiliate_id() is None
def test_cf_affiliate_id_returns_none_when_blank(self, monkeypatch):
monkeypatch.setenv("TEST_AFF_ID", " ")
prog = AffiliateProgram(
name="Test", retailer_key="test",
env_var="TEST_AFF_ID", build_url=lambda u, i: u
)
assert prog.cf_affiliate_id() is None
class TestRegistry:
def test_builtin_ebay_registered(self):
assert get_program("ebay") is not None
assert get_program("ebay").name == "eBay Partner Network"
def test_builtin_amazon_registered(self):
assert get_program("amazon") is not None
assert get_program("amazon").name == "Amazon Associates"
def test_unknown_key_returns_none(self):
assert get_program("not_a_retailer") is None
def test_register_custom_program(self):
prog = AffiliateProgram(
name="Custom Shop", retailer_key="customshop",
env_var="CUSTOM_ID", build_url=lambda u, i: f"{u}?ref={i}"
)
register_program(prog)
assert get_program("customshop") is prog
assert "customshop" in registered_keys()
def test_register_overwrites_existing(self):
prog1 = AffiliateProgram("A", "overwrite_test", "X", lambda u, i: u)
prog2 = AffiliateProgram("B", "overwrite_test", "Y", lambda u, i: u)
register_program(prog1)
register_program(prog2)
assert get_program("overwrite_test").name == "B"
class TestEbayUrlBuilder:
def test_appends_params_to_plain_url(self):
url = _build_ebay_url("https://www.ebay.com/itm/123", "my-campaign")
assert "campid=my-campaign" in url
assert "mkcid=1" in url
assert "mkevt=1" in url
assert url.startswith("https://www.ebay.com/itm/123?")
def test_uses_ampersand_when_query_already_present(self):
url = _build_ebay_url("https://www.ebay.com/itm/123?existing=1", "c1")
assert url.startswith("https://www.ebay.com/itm/123?existing=1&")
assert "campid=c1" in url
def test_does_not_double_encode(self):
url = _build_ebay_url("https://www.ebay.com/itm/123", "camp-id-1")
assert "camp-id-1" in url
class TestAmazonUrlBuilder:
def test_appends_tag_to_plain_url(self):
url = _build_amazon_url("https://www.amazon.com/dp/B001234567", "cf-kiwi-20")
assert "tag=cf-kiwi-20" in url
def test_merges_tag_into_existing_query(self):
url = _build_amazon_url("https://www.amazon.com/dp/B001234567?ref=sr_1_1", "cf-kiwi-20")
assert "tag=cf-kiwi-20" in url
assert "ref=sr_1_1" in url
def test_replaces_existing_tag(self):
url = _build_amazon_url("https://www.amazon.com/dp/B001?tag=old-tag-20", "new-tag-20")
assert "tag=new-tag-20" in url
assert "old-tag-20" not in url

View file

@ -0,0 +1,114 @@
"""Tests for affiliate URL wrapping resolution logic."""
import pytest
from circuitforge_core.affiliates.router import wrap_url
def _pref_store(prefs: dict):
"""Return a get_preference callable backed by a plain dict."""
def get_preference(user_id, path, default=None):
keys = path.split(".")
node = prefs
for k in keys:
if not isinstance(node, dict):
return default
node = node.get(k)
if node is None:
return default
return node
return get_preference
class TestWrapUrlEnvVarMode:
"""No get_preference injected — env-var-only mode."""
def test_returns_affiliate_url_when_env_set(self, monkeypatch):
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "camp123")
result = wrap_url("https://www.ebay.com/itm/1", retailer="ebay")
assert "campid=camp123" in result
def test_returns_plain_url_when_env_unset(self, monkeypatch):
monkeypatch.delenv("EBAY_AFFILIATE_CAMPAIGN_ID", raising=False)
result = wrap_url("https://www.ebay.com/itm/1", retailer="ebay")
assert result == "https://www.ebay.com/itm/1"
def test_returns_plain_url_for_unknown_retailer(self, monkeypatch):
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "camp123")
result = wrap_url("https://www.example.com/item/1", retailer="unknown_shop")
assert result == "https://www.example.com/item/1"
def test_amazon_env_var(self, monkeypatch):
monkeypatch.setenv("AMAZON_ASSOCIATES_TAG", "cf-kiwi-20")
result = wrap_url("https://www.amazon.com/dp/B001", retailer="amazon")
assert "tag=cf-kiwi-20" in result
class TestWrapUrlOptOut:
"""get_preference injected — opt-out enforcement."""
def test_opted_out_returns_plain_url(self, monkeypatch):
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "camp123")
get_pref = _pref_store({"affiliate": {"opt_out": True}})
result = wrap_url(
"https://www.ebay.com/itm/1", retailer="ebay",
user_id="u1", get_preference=get_pref,
)
assert result == "https://www.ebay.com/itm/1"
def test_opted_in_returns_affiliate_url(self, monkeypatch):
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "camp123")
get_pref = _pref_store({"affiliate": {"opt_out": False}})
result = wrap_url(
"https://www.ebay.com/itm/1", retailer="ebay",
user_id="u1", get_preference=get_pref,
)
assert "campid=camp123" in result
def test_no_preference_set_defaults_to_opted_in(self, monkeypatch):
"""Missing opt_out key = opted in (default behaviour per design doc)."""
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "camp123")
get_pref = _pref_store({})
result = wrap_url(
"https://www.ebay.com/itm/1", retailer="ebay",
user_id="u1", get_preference=get_pref,
)
assert "campid=camp123" in result
class TestWrapUrlByok:
"""BYOK affiliate ID takes precedence over CF's ID."""
def test_byok_id_used_instead_of_cf_id(self, monkeypatch):
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "cf-camp")
get_pref = _pref_store({
"affiliate": {
"opt_out": False,
"byok_ids": {"ebay": "user-own-camp"},
}
})
result = wrap_url(
"https://www.ebay.com/itm/1", retailer="ebay",
user_id="u1", get_preference=get_pref,
)
assert "campid=user-own-camp" in result
assert "cf-camp" not in result
def test_byok_only_used_when_present(self, monkeypatch):
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "cf-camp")
get_pref = _pref_store({"affiliate": {"opt_out": False, "byok_ids": {}}})
result = wrap_url(
"https://www.ebay.com/itm/1", retailer="ebay",
user_id="u1", get_preference=get_pref,
)
assert "campid=cf-camp" in result
def test_byok_without_user_id_not_applied(self, monkeypatch):
"""BYOK requires a user_id — anonymous users get CF's ID."""
monkeypatch.setenv("EBAY_AFFILIATE_CAMPAIGN_ID", "cf-camp")
get_pref = _pref_store({
"affiliate": {"opt_out": False, "byok_ids": {"ebay": "user-own-camp"}}
})
result = wrap_url(
"https://www.ebay.com/itm/1", retailer="ebay",
user_id=None, get_preference=get_pref,
)
assert "campid=cf-camp" in result

148
tests/test_preferences.py Normal file
View file

@ -0,0 +1,148 @@
"""Tests for circuitforge_core.preferences path utilities."""
import pytest
from circuitforge_core.preferences import get_path, set_path
class TestGetPath:
def test_top_level_key(self):
assert get_path({"a": 1}, "a") == 1
def test_nested_key(self):
data = {"affiliate": {"opt_out": False}}
assert get_path(data, "affiliate.opt_out") is False
def test_deeply_nested(self):
data = {"affiliate": {"byok_ids": {"ebay": "my-tag"}}}
assert get_path(data, "affiliate.byok_ids.ebay") == "my-tag"
def test_missing_key_returns_default(self):
assert get_path({}, "missing", default="x") == "x"
def test_missing_nested_returns_default(self):
assert get_path({"a": {}}, "a.b.c", default=42) == 42
def test_default_is_none_when_omitted(self):
assert get_path({}, "nope") is None
def test_non_dict_intermediate_returns_default(self):
assert get_path({"a": "string"}, "a.b", default="d") == "d"
class TestSetPath:
def test_top_level_key(self):
result = set_path({}, "opt_out", True)
assert result == {"opt_out": True}
def test_nested_key_created(self):
result = set_path({}, "affiliate.opt_out", True)
assert result == {"affiliate": {"opt_out": True}}
def test_deeply_nested(self):
result = set_path({}, "affiliate.byok_ids.ebay", "my-tag")
assert result == {"affiliate": {"byok_ids": {"ebay": "my-tag"}}}
def test_preserves_sibling_keys(self):
data = {"affiliate": {"opt_out": False, "byok_ids": {}}}
result = set_path(data, "affiliate.opt_out", True)
assert result["affiliate"]["opt_out"] is True
assert result["affiliate"]["byok_ids"] == {}
def test_preserves_unrelated_top_level_keys(self):
data = {"other": "value", "affiliate": {"opt_out": False}}
result = set_path(data, "affiliate.opt_out", True)
assert result["other"] == "value"
def test_does_not_mutate_original(self):
data = {"affiliate": {"opt_out": False}}
set_path(data, "affiliate.opt_out", True)
assert data["affiliate"]["opt_out"] is False
def test_overwrites_existing_value(self):
data = {"affiliate": {"byok_ids": {"ebay": "old-tag"}}}
result = set_path(data, "affiliate.byok_ids.ebay", "new-tag")
assert result["affiliate"]["byok_ids"]["ebay"] == "new-tag"
def test_non_dict_intermediate_replaced(self):
data = {"affiliate": "not-a-dict"}
result = set_path(data, "affiliate.opt_out", True)
assert result == {"affiliate": {"opt_out": True}}
def test_roundtrip_get_after_set(self):
prefs = {}
prefs = set_path(prefs, "affiliate.opt_out", True)
prefs = set_path(prefs, "affiliate.byok_ids.ebay", "tag-123")
assert get_path(prefs, "affiliate.opt_out") is True
assert get_path(prefs, "affiliate.byok_ids.ebay") == "tag-123"
import os
import tempfile
from pathlib import Path
from circuitforge_core.preferences.store import LocalFileStore
class TestLocalFileStore:
def _store(self, tmp_path) -> LocalFileStore:
return LocalFileStore(prefs_path=tmp_path / "preferences.yaml")
def test_get_returns_default_when_file_missing(self, tmp_path):
store = self._store(tmp_path)
assert store.get(user_id=None, path="affiliate.opt_out", default=False) is False
def test_set_then_get_roundtrip(self, tmp_path):
store = self._store(tmp_path)
store.set(user_id=None, path="affiliate.opt_out", value=True)
assert store.get(user_id=None, path="affiliate.opt_out", default=False) is True
def test_set_nested_path(self, tmp_path):
store = self._store(tmp_path)
store.set(user_id=None, path="affiliate.byok_ids.ebay", value="my-tag")
assert store.get(user_id=None, path="affiliate.byok_ids.ebay") == "my-tag"
def test_set_preserves_sibling_keys(self, tmp_path):
store = self._store(tmp_path)
store.set(user_id=None, path="affiliate.opt_out", value=False)
store.set(user_id=None, path="affiliate.byok_ids.ebay", value="tag")
assert store.get(user_id=None, path="affiliate.opt_out") is False
assert store.get(user_id=None, path="affiliate.byok_ids.ebay") == "tag"
def test_creates_parent_dirs(self, tmp_path):
deep_path = tmp_path / "deep" / "nested" / "preferences.yaml"
store = LocalFileStore(prefs_path=deep_path)
store.set(user_id=None, path="x", value=1)
assert deep_path.exists()
def test_user_id_ignored_for_local_store(self, tmp_path):
"""LocalFileStore is single-user; user_id is accepted but ignored."""
store = self._store(tmp_path)
store.set(user_id="u123", path="affiliate.opt_out", value=True)
assert store.get(user_id="u456", path="affiliate.opt_out", default=False) is True
from circuitforge_core.preferences import get_user_preference, set_user_preference
class TestPreferenceHelpers:
def _store(self, tmp_path) -> LocalFileStore:
return LocalFileStore(prefs_path=tmp_path / "preferences.yaml")
def test_get_returns_default_when_unset(self, tmp_path):
store = self._store(tmp_path)
result = get_user_preference(user_id=None, path="affiliate.opt_out",
default=False, store=store)
assert result is False
def test_set_then_get(self, tmp_path):
store = self._store(tmp_path)
set_user_preference(user_id=None, path="affiliate.opt_out", value=True, store=store)
result = get_user_preference(user_id=None, path="affiliate.opt_out",
default=False, store=store)
assert result is True
def test_default_store_is_local(self, tmp_path, monkeypatch):
"""When no store is passed, helpers use LocalFileStore at default path."""
from circuitforge_core.preferences import store as store_module
local = self._store(tmp_path)
monkeypatch.setattr(store_module, "_DEFAULT_STORE", local)
set_user_preference(user_id=None, path="x.y", value=42)
assert get_user_preference(user_id=None, path="x.y") == 42

View file

@ -0,0 +1,148 @@
"""Tests for HeimdallAuthMiddleware — TTL cache and request gating."""
import time
import pytest
from unittest.mock import patch, MagicMock
from fastapi import FastAPI
from fastapi.testclient import TestClient
from circuitforge_core.resources.coordinator.auth import (
HeimdallAuthMiddleware,
_ValidationCache,
CACHE_TTL_S,
)
# ── Cache unit tests ──────────────────────────────────────────────────────────
def test_cache_miss_returns_none():
cache = _ValidationCache()
assert cache.get("nonexistent") is None
def test_cache_stores_and_retrieves():
cache = _ValidationCache()
cache.set("key1", valid=True, tier="paid", user_id="u1")
entry = cache.get("key1")
assert entry is not None
assert entry.valid is True
assert entry.tier == "paid"
def test_cache_entry_expires():
cache = _ValidationCache(ttl_s=0.05)
cache.set("key1", valid=True, tier="paid", user_id="u1")
time.sleep(0.1)
assert cache.get("key1") is None
def test_cache_evict_removes_key():
cache = _ValidationCache()
cache.set("key1", valid=True, tier="paid", user_id="u1")
cache.evict("key1")
assert cache.get("key1") is None
def test_cache_prune_removes_expired():
cache = _ValidationCache(ttl_s=0.05)
cache.set("k1", valid=True, tier="paid", user_id="")
cache.set("k2", valid=True, tier="paid", user_id="")
time.sleep(0.1)
removed = cache.prune()
assert removed == 2
# ── Middleware integration tests ──────────────────────────────────────────────
def _make_app_with_auth(middleware: HeimdallAuthMiddleware) -> TestClient:
app = FastAPI()
app.middleware("http")(middleware)
@app.get("/api/health")
def health():
return {"status": "ok"}
@app.post("/api/services/vllm/allocate")
def allocate():
return {"allocation_id": "abc", "url": "http://gpu:8000"}
return TestClient(app, raise_server_exceptions=False)
def _patched_middleware(valid: bool, tier: str = "paid") -> HeimdallAuthMiddleware:
"""Return a middleware whose Heimdall call is pre-mocked."""
mw = HeimdallAuthMiddleware(
heimdall_url="http://heimdall.test",
min_tier="paid",
)
mw._validate_against_heimdall = MagicMock( # type: ignore[method-assign]
return_value=(valid, tier, "user-1" if valid else "")
)
return mw
def test_health_exempt_no_auth_required():
mw = _patched_middleware(valid=True)
client = _make_app_with_auth(mw)
resp = client.get("/api/health")
assert resp.status_code == 200
def test_missing_auth_header_returns_401():
mw = _patched_middleware(valid=True)
client = _make_app_with_auth(mw)
resp = client.post("/api/services/vllm/allocate")
assert resp.status_code == 401
def test_invalid_key_returns_403():
mw = _patched_middleware(valid=False)
client = _make_app_with_auth(mw)
resp = client.post(
"/api/services/vllm/allocate",
headers={"Authorization": "Bearer BAD-KEY"},
)
assert resp.status_code == 403
def test_valid_paid_key_passes():
mw = _patched_middleware(valid=True, tier="paid")
client = _make_app_with_auth(mw)
resp = client.post(
"/api/services/vllm/allocate",
headers={"Authorization": "Bearer CFG-KIWI-GOOD-GOOD-GOOD"},
)
assert resp.status_code == 200
def test_free_tier_key_rejected_when_min_is_paid():
mw = _patched_middleware(valid=True, tier="free")
client = _make_app_with_auth(mw)
resp = client.post(
"/api/services/vllm/allocate",
headers={"Authorization": "Bearer CFG-KIWI-FREE-FREE-FREE"},
)
assert resp.status_code == 403
assert "paid" in resp.json()["detail"]
def test_cache_prevents_second_heimdall_call():
mw = _patched_middleware(valid=True, tier="paid")
client = _make_app_with_auth(mw)
key = "CFG-KIWI-CACHED-KEY-1"
headers = {"Authorization": f"Bearer {key}"}
client.post("/api/services/vllm/allocate", headers=headers)
client.post("/api/services/vllm/allocate", headers=headers)
# Heimdall should only have been called once — second hit is from cache
assert mw._validate_against_heimdall.call_count == 1 # type: ignore[attr-defined]
def test_from_env_returns_none_without_heimdall_url(monkeypatch):
monkeypatch.delenv("HEIMDALL_URL", raising=False)
assert HeimdallAuthMiddleware.from_env() is None
def test_from_env_returns_middleware_when_set(monkeypatch):
monkeypatch.setenv("HEIMDALL_URL", "http://heimdall.test")
mw = HeimdallAuthMiddleware.from_env()
assert mw is not None
assert mw._heimdall == "http://heimdall.test"