diff --git a/api/main.py b/api/main.py index 17d99ea..1710b8a 100644 --- a/api/main.py +++ b/api/main.py @@ -209,13 +209,21 @@ async def _lifespan(app: FastAPI): _category_cache.refresh(token_manager=None) # bootstrap fallback try: - from app.llm.router import LLMRouter - _llm_router = LLMRouter() - _query_translator = QueryTranslator( - category_cache=_category_cache, - llm_router=_llm_router, - ) - log.info("LLM query builder ready.") + cforch_url = os.getenv("CF_ORCH_URL") or None + if cforch_url: + _query_translator = QueryTranslator( + category_cache=_category_cache, + cforch_url=cforch_url, + ) + log.info("LLM query builder ready (cf-orch).") + else: + from app.llm.router import LLMRouter + _llm_router = LLMRouter() + _query_translator = QueryTranslator( + category_cache=_category_cache, + llm_router=_llm_router, + ) + log.info("LLM query builder ready (local LLM).") except Exception: log.info("No LLM backend configured — query builder disabled.") except Exception: @@ -2005,7 +2013,7 @@ async def build_search_query( if translator is None: raise HTTPException( status_code=503, - detail="No LLM backend configured. Set OLLAMA_HOST, ANTHROPIC_API_KEY, or OPENAI_API_KEY.", + detail="No LLM backend configured. Set CF_ORCH_URL (cloud) or OLLAMA_HOST / ANTHROPIC_API_KEY / OPENAI_API_KEY (local).", ) from app.llm.query_translator import QueryTranslatorError diff --git a/app/llm/query_translator.py b/app/llm/query_translator.py index 6b18c47..0d02f5c 100644 --- a/app/llm/query_translator.py +++ b/app/llm/query_translator.py @@ -2,9 +2,15 @@ # BSL 1.1 License """LLM query builder — translates natural language to eBay SearchFilters. -The QueryTranslator calls LLMRouter.complete() (synchronous) with a domain-aware -system prompt. The prompt includes category hints injected from EbayCategoryCache. -The LLM returns a single JSON object matching SearchParamsResponse. +Supports two backends, selected at construction time: + + cforch_url — cf-orch task endpoint (cloud/premium). The coordinator resolves + product+task to a model and returns an allocation. The caller + POSTs to the allocated service URL, then DELETEs the allocation. + + llm_router — circuitforge_core.LLMRouter (local installs: ollama/vllm/api keys). + +Exactly one of cforch_url or llm_router must be supplied. """ from __future__ import annotations @@ -13,6 +19,8 @@ import logging from dataclasses import dataclass from typing import TYPE_CHECKING, Optional +import httpx + if TYPE_CHECKING: from app.platforms.ebay.categories import EbayCategoryCache @@ -128,11 +136,23 @@ class QueryTranslator: Args: category_cache: An EbayCategoryCache instance (may have empty cache). - llm_router: An LLMRouter instance from circuitforge_core. + cforch_url: cf-orch coordinator base URL (cloud/premium path). + llm_router: A circuitforge_core LLMRouter instance (local path). + + Exactly one of cforch_url or llm_router must be provided. """ - def __init__(self, category_cache: "EbayCategoryCache", llm_router: object) -> None: + def __init__( + self, + category_cache: "EbayCategoryCache", + *, + cforch_url: str | None = None, + llm_router: object | None = None, + ) -> None: + if cforch_url is None and llm_router is None: + raise ValueError("Either cforch_url or llm_router must be provided") self._cache = category_cache + self._cforch_url = cforch_url self._llm_router = llm_router def translate(self, natural_language: str) -> SearchParamsResponse: @@ -154,14 +174,58 @@ class QueryTranslator: system_prompt = _SYSTEM_PROMPT_TEMPLATE.format(category_hints=category_hints) try: - raw = self._llm_router.complete( - natural_language, - system=system_prompt, - max_tokens=512, - ) + if self._cforch_url: + raw = self._call_orch(system_prompt, natural_language) + else: + raw = self._call_local(system_prompt, natural_language) + except QueryTranslatorError: + raise except Exception as exc: raise QueryTranslatorError( f"LLM backend error: {exc}", raw="" ) from exc return _parse_response(raw) + + def _call_orch(self, system_prompt: str, user_message: str) -> str: + """Allocate via cf-orch task endpoint, call the model, release the slot.""" + alloc_resp = httpx.post( + f"{self._cforch_url}/api/inference/task", + json={"product": "snipe", "task": "query_translation"}, + timeout=10.0, + ) + alloc_resp.raise_for_status() + alloc = alloc_resp.json() + service_url = alloc["url"] + allocation_id = alloc["allocation_id"] + try: + resp = httpx.post( + f"{service_url}/v1/chat/completions", + json={ + "model": "__auto__", + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_message}, + ], + "max_tokens": 512, + }, + timeout=60.0, + ) + resp.raise_for_status() + return resp.json()["choices"][0]["message"]["content"] + finally: + try: + httpx.delete( + f"{self._cforch_url}/api/services/cf-text/allocations/{allocation_id}", + timeout=5.0, + ) + except Exception: + log.warning("Failed to release cf-orch allocation %s", allocation_id) + + def _call_local(self, system_prompt: str, user_message: str) -> str: + """Call the locally-configured LLMRouter (ollama/vllm/api keys).""" + return self._llm_router.complete( # type: ignore[union-attr] + user_message, + system=system_prompt, + max_tokens=512, + ) diff --git a/pyproject.toml b/pyproject.toml index adfde53..3af54b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "playwright-stealth>=1.0", "cryptography>=42.0", "PyJWT>=2.8", + "httpx>=0.27", ] [project.optional-dependencies] @@ -30,7 +31,6 @@ dev = [ "pytest>=8.0", "pytest-cov>=5.0", "ruff>=0.4", - "httpx>=0.27", # FastAPI test client ] [tool.setuptools.packages.find] diff --git a/tests/test_query_translator.py b/tests/test_query_translator.py index 20f16eb..3841931 100644 --- a/tests/test_query_translator.py +++ b/tests/test_query_translator.py @@ -1,4 +1,4 @@ -"""Unit tests for QueryTranslator — LLMRouter mocked at boundary.""" +"""Unit tests for QueryTranslator — LLMRouter and cf-orch backends mocked at boundary.""" from __future__ import annotations import json @@ -73,7 +73,7 @@ def test_parse_response_missing_required_field(): _parse_response(raw) -# ── QueryTranslator (integration with mocked LLMRouter) ────────────────────── +# ── Fixtures ────────────────────────────────────────────────────────────────── from app.platforms.ebay.categories import EbayCategoryCache from circuitforge_core.db import get_connection, run_migrations @@ -88,7 +88,22 @@ def db_with_categories(tmp_path): return conn -def _make_translator(db_conn, llm_response: str) -> QueryTranslator: +_VALID_LLM_RESPONSE = json.dumps({ + "base_query": "RTX 3080", + "must_include_mode": "groups", + "must_include": "rtx|geforce, 3080", + "must_exclude": "mining,for parts", + "max_price": 300.0, + "min_price": None, + "condition": ["used"], + "category_id": "27386", + "explanation": "Searching for used RTX 3080 GPUs under $300.", +}) + + +# ── Local LLMRouter backend ─────────────────────────────────────────────────── + +def _make_local_translator(db_conn, llm_response: str) -> QueryTranslator: from app.platforms.ebay.categories import EbayCategoryCache cache = EbayCategoryCache(db_conn) mock_router = MagicMock() @@ -97,18 +112,7 @@ def _make_translator(db_conn, llm_response: str) -> QueryTranslator: def test_translate_returns_search_params(db_with_categories): - llm_out = json.dumps({ - "base_query": "RTX 3080", - "must_include_mode": "groups", - "must_include": "rtx|geforce, 3080", - "must_exclude": "mining,for parts", - "max_price": 300.0, - "min_price": None, - "condition": ["used"], - "category_id": "27386", - "explanation": "Searching for used RTX 3080 GPUs under $300.", - }) - t = _make_translator(db_with_categories, llm_out) + t = _make_local_translator(db_with_categories, _VALID_LLM_RESPONSE) result = t.translate("used RTX 3080 under $300 no mining") assert result.base_query == "RTX 3080" assert result.max_price == 300.0 @@ -116,18 +120,7 @@ def test_translate_returns_search_params(db_with_categories): def test_translate_injects_category_hints(db_with_categories): """The system prompt sent to the LLM must contain category_id hints.""" - llm_out = json.dumps({ - "base_query": "GPU", - "must_include_mode": "all", - "must_include": "", - "must_exclude": "", - "max_price": None, - "min_price": None, - "condition": [], - "category_id": None, - "explanation": "Searching for GPUs.", - }) - t = _make_translator(db_with_categories, llm_out) + t = _make_local_translator(db_with_categories, _VALID_LLM_RESPONSE) t.translate("GPU") call_args = t._llm_router.complete.call_args system_prompt = call_args.kwargs.get("system") or call_args.args[1] @@ -141,7 +134,7 @@ def test_translate_empty_category_cache_still_works(tmp_path): conn = get_connection(tmp_path / "empty.db") run_migrations(conn, Path("app/db/migrations")) # Do NOT seed bootstrap — empty cache - llm_out = json.dumps({ + t = _make_local_translator(conn, json.dumps({ "base_query": "vinyl", "must_include_mode": "all", "must_include": "", @@ -151,8 +144,7 @@ def test_translate_empty_category_cache_still_works(tmp_path): "condition": [], "category_id": None, "explanation": "Searching for vinyl records.", - }) - t = _make_translator(conn, llm_out) + })) result = t.translate("vinyl records") assert result.base_query == "vinyl" call_args = t._llm_router.complete.call_args @@ -168,3 +160,101 @@ def test_translate_llm_error_raises_query_translator_error(db_with_categories): t = QueryTranslator(category_cache=cache, llm_router=mock_router) with pytest.raises(QueryTranslatorError, match="LLM backend"): t.translate("used GPU") + + +# ── cf-orch backend ─────────────────────────────────────────────────────────── + +def _make_orch_translator(db_conn) -> QueryTranslator: + from app.platforms.ebay.categories import EbayCategoryCache + cache = EbayCategoryCache(db_conn) + return QueryTranslator(category_cache=cache, cforch_url="http://orch.local:8700") + + +def _mock_alloc_response() -> MagicMock: + resp = MagicMock() + resp.json.return_value = { + "url": "http://cf-text.local:11434", + "allocation_id": "alloc-abc123", + "node_id": "heimdall", + } + resp.raise_for_status.return_value = None + return resp + + +def _mock_chat_response(content: str) -> MagicMock: + resp = MagicMock() + resp.json.return_value = { + "choices": [{"message": {"content": content}}] + } + resp.raise_for_status.return_value = None + return resp + + +def _mock_delete_response() -> MagicMock: + resp = MagicMock() + resp.raise_for_status.return_value = None + return resp + + +def test_orch_translate_returns_search_params(db_with_categories): + t = _make_orch_translator(db_with_categories) + with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete: + mock_post.side_effect = [ + _mock_alloc_response(), + _mock_chat_response(_VALID_LLM_RESPONSE), + ] + mock_delete.return_value = _mock_delete_response() + result = t.translate("used RTX 3080 under $300") + assert result.base_query == "RTX 3080" + assert result.max_price == 300.0 + + +def test_orch_allocates_with_correct_task_tag(db_with_categories): + t = _make_orch_translator(db_with_categories) + with patch("httpx.post") as mock_post, patch("httpx.delete"): + mock_post.side_effect = [ + _mock_alloc_response(), + _mock_chat_response(_VALID_LLM_RESPONSE), + ] + t.translate("GPU") + alloc_call = mock_post.call_args_list[0] + assert alloc_call.args[0] == "http://orch.local:8700/api/inference/task" + body = alloc_call.kwargs.get("json") or alloc_call.args[1] + assert body == {"product": "snipe", "task": "query_translation"} + + +def test_orch_releases_allocation_after_success(db_with_categories): + t = _make_orch_translator(db_with_categories) + with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete: + mock_post.side_effect = [ + _mock_alloc_response(), + _mock_chat_response(_VALID_LLM_RESPONSE), + ] + mock_delete.return_value = _mock_delete_response() + t.translate("GPU") + mock_delete.assert_called_once() + delete_url = mock_delete.call_args.args[0] + assert "alloc-abc123" in delete_url + + +def test_orch_releases_allocation_on_inference_failure(db_with_categories): + """Allocation must be released even when the inference call fails.""" + t = _make_orch_translator(db_with_categories) + with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete: + mock_post.side_effect = [ + _mock_alloc_response(), + Exception("inference timeout"), + ] + mock_delete.return_value = _mock_delete_response() + with pytest.raises(QueryTranslatorError, match="LLM backend"): + t.translate("GPU") + mock_delete.assert_called_once() + + +def test_init_requires_at_least_one_backend(tmp_path): + from circuitforge_core.db import get_connection, run_migrations + conn = get_connection(tmp_path / "test.db") + run_migrations(conn, Path("app/db/migrations")) + cache = EbayCategoryCache(conn) + with pytest.raises(ValueError, match="cforch_url or llm_router"): + QueryTranslator(category_cache=cache)