feat(streaming): add coordinator_proxy service module

This commit is contained in:
pyr0ball 2026-04-24 10:18:40 -07:00
parent c3e7dc1ea4
commit 0996ea8c7a
2 changed files with 105 additions and 0 deletions

View file

@ -0,0 +1,94 @@
"""cf-orch coordinator proxy client.
Calls the coordinator's /proxy/authorize endpoint to obtain a one-time
stream URL + token for LLM streaming. Always raises CoordinatorError on
failure callers decide how to handle it (stream-token endpoint returns
503 or 403 as appropriate).
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
import httpx
log = logging.getLogger(__name__)
class CoordinatorError(Exception):
"""Raised when the coordinator returns an error or is unreachable."""
def __init__(self, message: str, status_code: int = 503):
super().__init__(message)
self.status_code = status_code
@dataclass(frozen=True)
class StreamTokenResult:
stream_url: str
token: str
expires_in_s: int
def _coordinator_url() -> str:
return os.environ.get("COORDINATOR_URL", "http://10.1.10.71:7700")
def _product_key() -> str:
return os.environ.get("COORDINATOR_KIWI_KEY", "")
async def coordinator_authorize(
prompt: str,
caller: str = "kiwi-recipe",
ttl_s: int = 300,
) -> StreamTokenResult:
"""Call POST /proxy/authorize on the coordinator.
Returns a StreamTokenResult with the stream URL and one-time token.
Raises CoordinatorError on any failure (network, auth, capacity).
"""
url = f"{_coordinator_url()}/proxy/authorize"
key = _product_key()
if not key:
raise CoordinatorError(
"COORDINATOR_KIWI_KEY env var is not set — streaming unavailable",
status_code=503,
)
payload = {
"product": "kiwi",
"product_key": key,
"caller": caller,
"prompt": prompt,
"params": {},
"ttl_s": ttl_s,
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(url, json=payload)
except httpx.RequestError as exc:
log.warning("coordinator_authorize network error: %s", exc)
raise CoordinatorError(f"Coordinator unreachable: {exc}", status_code=503)
if resp.status_code == 401:
raise CoordinatorError("Invalid product key", status_code=401)
if resp.status_code == 429:
raise CoordinatorError("Too many concurrent streams", status_code=429)
if resp.status_code == 503:
raise CoordinatorError("No GPU available for streaming", status_code=503)
if not resp.is_success:
raise CoordinatorError(
f"Coordinator error {resp.status_code}: {resp.text[:200]}",
status_code=503,
)
data = resp.json()
# Use public_stream_url if coordinator provides it (cloud mode), else stream_url
stream_url = data.get("public_stream_url") or data["stream_url"]
return StreamTokenResult(
stream_url=stream_url,
token=data["token"],
expires_in_s=data["expires_in_s"],
)

View file

@ -0,0 +1,11 @@
"""Tests for POST /api/v1/recipes/stream-token — coordinator proxy integration."""
import pytest
from unittest.mock import AsyncMock, patch
def test_coordinator_authorize_missing_url(monkeypatch):
"""coordinator_authorize raises RuntimeError when COORDINATOR_URL is unset."""
monkeypatch.delenv("COORDINATOR_URL", raising=False)
monkeypatch.delenv("COORDINATOR_KIWI_KEY", raising=False)
# Will test this properly via endpoint — see Task 3 tests.
pass