snipe/app/llm/query_translator.py
pyr0ball 1bf95bba2a feat(llm): migrate query_translator to cf-orch task endpoint for cloud, keep LLMRouter for local (#54)
QueryTranslator now supports two backends chosen at startup:
- CF_ORCH_URL set: allocate via POST /api/inference/task (product=snipe,
  task=query_translation), call the allocated cf-text service, release the
  slot in a finally block to guarantee the VRAM lease is freed.
- CF_ORCH_URL absent: existing LLMRouter path unchanged (ollama/vllm/api keys).

Also moves httpx from dev-only to main dependencies (already used by mcp/server.py).
2026-05-13 15:22:09 -07:00

231 lines
9.3 KiB
Python

# app/llm/query_translator.py
# BSL 1.1 License
"""LLM query builder — translates natural language to eBay SearchFilters.
Supports two backends, selected at construction time:
cforch_url — cf-orch task endpoint (cloud/premium). The coordinator resolves
product+task to a model and returns an allocation. The caller
POSTs to the allocated service URL, then DELETEs the allocation.
llm_router — circuitforge_core.LLMRouter (local installs: ollama/vllm/api keys).
Exactly one of cforch_url or llm_router must be supplied.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional
import httpx
if TYPE_CHECKING:
from app.platforms.ebay.categories import EbayCategoryCache
log = logging.getLogger(__name__)
class QueryTranslatorError(Exception):
"""Raised when the LLM output cannot be parsed into SearchParamsResponse."""
def __init__(self, message: str, raw: str = "") -> None:
super().__init__(message)
self.raw = raw
@dataclass(frozen=True)
class SearchParamsResponse:
"""Parsed LLM response — maps 1:1 to the /api/search query parameters."""
base_query: str
must_include_mode: str # "all" | "any" | "groups"
must_include: str # raw filter string
must_exclude: str # comma-separated exclusion terms
max_price: Optional[float]
min_price: Optional[float]
condition: list[str] # subset of ["new", "used", "for_parts"]
category_id: Optional[str] # eBay category ID string, or None
explanation: str # one-sentence plain-language summary
_VALID_MODES = {"all", "any", "groups"}
_VALID_CONDITIONS = {"new", "used", "for_parts"}
def _parse_response(raw: str) -> SearchParamsResponse:
"""Parse the LLM's raw text output into a SearchParamsResponse.
Raises QueryTranslatorError if the JSON is malformed or required fields
are missing.
"""
try:
data = json.loads(raw.strip())
except json.JSONDecodeError as exc:
raise QueryTranslatorError(f"LLM returned unparseable JSON: {exc}", raw=raw) from exc
try:
base_query = str(data["base_query"]).strip()
if not base_query:
raise KeyError("base_query is empty")
must_include_mode = str(data.get("must_include_mode", "all"))
if must_include_mode not in _VALID_MODES:
must_include_mode = "all"
must_include = str(data.get("must_include", ""))
must_exclude = str(data.get("must_exclude", ""))
max_price = float(data["max_price"]) if data.get("max_price") is not None else None
min_price = float(data["min_price"]) if data.get("min_price") is not None else None
raw_conditions = data.get("condition", [])
condition = [c for c in raw_conditions if c in _VALID_CONDITIONS]
category_id = str(data["category_id"]) if data.get("category_id") else None
explanation = str(data.get("explanation", "")).strip()
except (KeyError, TypeError, ValueError) as exc:
raise QueryTranslatorError(
f"LLM response missing or invalid field: {exc}", raw=raw
) from exc
return SearchParamsResponse(
base_query=base_query,
must_include_mode=must_include_mode,
must_include=must_include,
must_exclude=must_exclude,
max_price=max_price,
min_price=min_price,
condition=condition,
category_id=category_id,
explanation=explanation,
)
# ── System prompt template ────────────────────────────────────────────────────
_SYSTEM_PROMPT_TEMPLATE = """\
You are a search assistant for Snipe, an eBay listing intelligence tool.
Your job is to translate a natural-language description of what someone is looking for
into a structured eBay search configuration.
Return ONLY a JSON object with these exact fields — no preamble, no markdown, no extra keys:
base_query (string) Primary search term, short — e.g. "RTX 3080", "vintage Leica"
must_include_mode (string) One of: "all" (AND), "any" (OR), "groups" (CNF: pipe=OR within group, comma=AND between groups)
must_include (string) Filter string per mode — leave blank if nothing to filter
must_exclude (string) Comma-separated terms to exclude — e.g. "mining,for parts,broken"
max_price (number|null) Maximum price in USD, or null
min_price (number|null) Minimum price in USD, or null
condition (array) Any of: "new", "used", "for_parts" — empty array means any condition
category_id (string|null) eBay category ID from the list below, or null if no match
explanation (string) One plain sentence summarizing what you built
eBay "groups" mode syntax example: to find a GPU that is BOTH (nvidia OR amd) AND (16gb OR 8gb):
must_include_mode: "groups"
must_include: "nvidia|amd, 16gb|8gb"
Phrase "like new", "open box", "refurbished" -> condition: ["used"]
Phrase "broken", "for parts", "not working" -> condition: ["for_parts"]
If unsure about condition, use an empty array.
Available eBay categories (use category_id verbatim if one fits — otherwise omit):
{category_hints}
If none match, omit category_id (set to null). Respond with valid JSON only. No commentary outside the JSON object.
"""
# ── QueryTranslator ───────────────────────────────────────────────────────────
class QueryTranslator:
"""Translates natural-language search descriptions into SearchParamsResponse.
Args:
category_cache: An EbayCategoryCache instance (may have empty cache).
cforch_url: cf-orch coordinator base URL (cloud/premium path).
llm_router: A circuitforge_core LLMRouter instance (local path).
Exactly one of cforch_url or llm_router must be provided.
"""
def __init__(
self,
category_cache: "EbayCategoryCache",
*,
cforch_url: str | None = None,
llm_router: object | None = None,
) -> None:
if cforch_url is None and llm_router is None:
raise ValueError("Either cforch_url or llm_router must be provided")
self._cache = category_cache
self._cforch_url = cforch_url
self._llm_router = llm_router
def translate(self, natural_language: str) -> SearchParamsResponse:
"""Translate a natural-language query into a SearchParamsResponse.
Raises QueryTranslatorError if the LLM fails or returns bad JSON.
"""
# Extract up to 10 keywords for category hint lookup
keywords = [w for w in natural_language.split()[:10] if len(w) > 2]
hints = self._cache.get_relevant(keywords, limit=30)
if not hints:
hints = self._cache.get_all_for_prompt(limit=40)
if hints:
category_hints = "\n".join(f"{cid}: {path}" for cid, path in hints)
else:
category_hints = "(no categories cached — omit category_id)"
system_prompt = _SYSTEM_PROMPT_TEMPLATE.format(category_hints=category_hints)
try:
if self._cforch_url:
raw = self._call_orch(system_prompt, natural_language)
else:
raw = self._call_local(system_prompt, natural_language)
except QueryTranslatorError:
raise
except Exception as exc:
raise QueryTranslatorError(
f"LLM backend error: {exc}", raw=""
) from exc
return _parse_response(raw)
def _call_orch(self, system_prompt: str, user_message: str) -> str:
"""Allocate via cf-orch task endpoint, call the model, release the slot."""
alloc_resp = httpx.post(
f"{self._cforch_url}/api/inference/task",
json={"product": "snipe", "task": "query_translation"},
timeout=10.0,
)
alloc_resp.raise_for_status()
alloc = alloc_resp.json()
service_url = alloc["url"]
allocation_id = alloc["allocation_id"]
try:
resp = httpx.post(
f"{service_url}/v1/chat/completions",
json={
"model": "__auto__",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"max_tokens": 512,
},
timeout=60.0,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
finally:
try:
httpx.delete(
f"{self._cforch_url}/api/services/cf-text/allocations/{allocation_id}",
timeout=5.0,
)
except Exception:
log.warning("Failed to release cf-orch allocation %s", allocation_id)
def _call_local(self, system_prompt: str, user_message: str) -> str:
"""Call the locally-configured LLMRouter (ollama/vllm/api keys)."""
return self._llm_router.complete( # type: ignore[union-attr]
user_message,
system=system_prompt,
max_tokens=512,
)