Compare commits
3 commits
ae0d4fbc89
...
4dd44fdafb
| Author | SHA1 | Date | |
|---|---|---|---|
| 4dd44fdafb | |||
| 263c8522ee | |||
| 1bf95bba2a |
7 changed files with 470 additions and 97 deletions
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
[](LICENSE)
|
||||
[]()
|
||||
[](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/releases)
|
||||
[](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe)
|
||||
[](https://docs.circuitforge.tech/snipe)
|
||||
|
||||
|
|
|
|||
24
api/main.py
24
api/main.py
|
|
@ -209,13 +209,21 @@ async def _lifespan(app: FastAPI):
|
|||
_category_cache.refresh(token_manager=None) # bootstrap fallback
|
||||
|
||||
try:
|
||||
from app.llm.router import LLMRouter
|
||||
_llm_router = LLMRouter()
|
||||
_query_translator = QueryTranslator(
|
||||
category_cache=_category_cache,
|
||||
llm_router=_llm_router,
|
||||
)
|
||||
log.info("LLM query builder ready.")
|
||||
cforch_url = os.getenv("CF_ORCH_URL") or None
|
||||
if cforch_url:
|
||||
_query_translator = QueryTranslator(
|
||||
category_cache=_category_cache,
|
||||
cforch_url=cforch_url,
|
||||
)
|
||||
log.info("LLM query builder ready (cf-orch).")
|
||||
else:
|
||||
from app.llm.router import LLMRouter
|
||||
_llm_router = LLMRouter()
|
||||
_query_translator = QueryTranslator(
|
||||
category_cache=_category_cache,
|
||||
llm_router=_llm_router,
|
||||
)
|
||||
log.info("LLM query builder ready (local LLM).")
|
||||
except Exception:
|
||||
log.info("No LLM backend configured — query builder disabled.")
|
||||
except Exception:
|
||||
|
|
@ -2005,7 +2013,7 @@ async def build_search_query(
|
|||
if translator is None:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="No LLM backend configured. Set OLLAMA_HOST, ANTHROPIC_API_KEY, or OPENAI_API_KEY.",
|
||||
detail="No LLM backend configured. Set CF_ORCH_URL (cloud) or OLLAMA_HOST / ANTHROPIC_API_KEY / OPENAI_API_KEY (local).",
|
||||
)
|
||||
|
||||
from app.llm.query_translator import QueryTranslatorError
|
||||
|
|
|
|||
|
|
@ -2,9 +2,15 @@
|
|||
# BSL 1.1 License
|
||||
"""LLM query builder — translates natural language to eBay SearchFilters.
|
||||
|
||||
The QueryTranslator calls LLMRouter.complete() (synchronous) with a domain-aware
|
||||
system prompt. The prompt includes category hints injected from EbayCategoryCache.
|
||||
The LLM returns a single JSON object matching SearchParamsResponse.
|
||||
Supports two backends, selected at construction time:
|
||||
|
||||
cforch_url — cf-orch task endpoint (cloud/premium). The coordinator resolves
|
||||
product+task to a model and returns an allocation. The caller
|
||||
POSTs to the allocated service URL, then DELETEs the allocation.
|
||||
|
||||
llm_router — circuitforge_core.LLMRouter (local installs: ollama/vllm/api keys).
|
||||
|
||||
Exactly one of cforch_url or llm_router must be supplied.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
|
|
@ -13,6 +19,8 @@ import logging
|
|||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
import httpx
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.platforms.ebay.categories import EbayCategoryCache
|
||||
|
||||
|
|
@ -128,11 +136,23 @@ class QueryTranslator:
|
|||
|
||||
Args:
|
||||
category_cache: An EbayCategoryCache instance (may have empty cache).
|
||||
llm_router: An LLMRouter instance from circuitforge_core.
|
||||
cforch_url: cf-orch coordinator base URL (cloud/premium path).
|
||||
llm_router: A circuitforge_core LLMRouter instance (local path).
|
||||
|
||||
Exactly one of cforch_url or llm_router must be provided.
|
||||
"""
|
||||
|
||||
def __init__(self, category_cache: "EbayCategoryCache", llm_router: object) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
category_cache: "EbayCategoryCache",
|
||||
*,
|
||||
cforch_url: str | None = None,
|
||||
llm_router: object | None = None,
|
||||
) -> None:
|
||||
if cforch_url is None and llm_router is None:
|
||||
raise ValueError("Either cforch_url or llm_router must be provided")
|
||||
self._cache = category_cache
|
||||
self._cforch_url = cforch_url
|
||||
self._llm_router = llm_router
|
||||
|
||||
def translate(self, natural_language: str) -> SearchParamsResponse:
|
||||
|
|
@ -154,14 +174,58 @@ class QueryTranslator:
|
|||
system_prompt = _SYSTEM_PROMPT_TEMPLATE.format(category_hints=category_hints)
|
||||
|
||||
try:
|
||||
raw = self._llm_router.complete(
|
||||
natural_language,
|
||||
system=system_prompt,
|
||||
max_tokens=512,
|
||||
)
|
||||
if self._cforch_url:
|
||||
raw = self._call_orch(system_prompt, natural_language)
|
||||
else:
|
||||
raw = self._call_local(system_prompt, natural_language)
|
||||
except QueryTranslatorError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise QueryTranslatorError(
|
||||
f"LLM backend error: {exc}", raw=""
|
||||
) from exc
|
||||
|
||||
return _parse_response(raw)
|
||||
|
||||
def _call_orch(self, system_prompt: str, user_message: str) -> str:
|
||||
"""Allocate via cf-orch task endpoint, call the model, release the slot."""
|
||||
alloc_resp = httpx.post(
|
||||
f"{self._cforch_url}/api/inference/task",
|
||||
json={"product": "snipe", "task": "query_translation"},
|
||||
timeout=10.0,
|
||||
)
|
||||
alloc_resp.raise_for_status()
|
||||
alloc = alloc_resp.json()
|
||||
service_url = alloc["url"]
|
||||
allocation_id = alloc["allocation_id"]
|
||||
try:
|
||||
resp = httpx.post(
|
||||
f"{service_url}/v1/chat/completions",
|
||||
json={
|
||||
"model": "__auto__",
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_message},
|
||||
],
|
||||
"max_tokens": 512,
|
||||
},
|
||||
timeout=60.0,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()["choices"][0]["message"]["content"]
|
||||
finally:
|
||||
try:
|
||||
httpx.delete(
|
||||
f"{self._cforch_url}/api/services/cf-text/allocations/{allocation_id}",
|
||||
timeout=5.0,
|
||||
)
|
||||
except Exception:
|
||||
log.warning("Failed to release cf-orch allocation %s", allocation_id)
|
||||
|
||||
def _call_local(self, system_prompt: str, user_message: str) -> str:
|
||||
"""Call the locally-configured LLMRouter (ollama/vllm/api keys)."""
|
||||
return self._llm_router.complete( # type: ignore[union-attr]
|
||||
user_message,
|
||||
system=system_prompt,
|
||||
max_tokens=512,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -7,28 +7,30 @@ Current task types:
|
|||
trust_photo_analysis — download primary photo, run vision LLM, write
|
||||
result to trust_scores.photo_analysis_json (Paid tier).
|
||||
|
||||
Prompt note: The vision prompt is a functional first pass. Tune against real
|
||||
eBay listings before GA — specifically stock-photo vs genuine-product distinction
|
||||
and the damage vocabulary.
|
||||
Image assessment routing:
|
||||
Cloud (CF_ORCH_URL set): allocates via cf-orch task endpoint
|
||||
product=snipe, task=image_assessment.
|
||||
Local (no CF_ORCH_URL) or TaskNotFound fallback: uses LLMRouter
|
||||
with a vision-capable local backend (moondream2, llava, etc.).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
import requests
|
||||
from circuitforge_core.db import get_connection
|
||||
from circuitforge_core.llm import LLMRouter
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"})
|
||||
|
||||
VRAM_BUDGETS: dict[str, float] = {
|
||||
# moondream2 / vision-capable LLM — single image, short response
|
||||
"trust_photo_analysis": 2.0,
|
||||
"trust_photo_analysis": 6000, # Q5_K_M Qwen2-VL via cf-orch; LLMRouter fallback uses 2.0 GB
|
||||
}
|
||||
|
||||
_VISION_SYSTEM_PROMPT = (
|
||||
|
|
@ -51,8 +53,7 @@ def insert_task(
|
|||
) -> tuple[int, bool]:
|
||||
"""Insert a background task if no identical task is already in-flight.
|
||||
|
||||
Uses get_connection() so WAL mode and timeout=30 apply — same as all other
|
||||
Snipe DB access. Returns (task_id, is_new).
|
||||
Returns (task_id, is_new).
|
||||
"""
|
||||
conn = get_connection(db_path)
|
||||
conn.row_factory = __import__("sqlite3").Row
|
||||
|
|
@ -120,32 +121,26 @@ def _run_trust_photo_analysis(
|
|||
p = json.loads(params or "{}")
|
||||
photo_url = p.get("photo_url", "")
|
||||
listing_title = p.get("listing_title", "")
|
||||
# user_db: per-user DB in cloud mode; same as db_path in local mode.
|
||||
result_db = Path(p.get("user_db", str(db_path)))
|
||||
|
||||
if not photo_url:
|
||||
raise ValueError("trust_photo_analysis: 'photo_url' is required in params")
|
||||
|
||||
# Download and base64-encode the photo
|
||||
resp = requests.get(photo_url, timeout=10)
|
||||
resp.raise_for_status()
|
||||
image_b64 = base64.b64encode(resp.content).decode()
|
||||
image_data_url = f"data:image/jpeg;base64,{image_b64}"
|
||||
|
||||
# Build user prompt with optional title context
|
||||
user_prompt = "Evaluate this eBay listing photo."
|
||||
user_prompt = "Assess this listing image."
|
||||
if listing_title:
|
||||
user_prompt = f"Evaluate this eBay listing photo for: {listing_title}"
|
||||
user_prompt = f"Assess this eBay listing image: {listing_title}"
|
||||
|
||||
# Call LLMRouter with vision capability
|
||||
router = LLMRouter()
|
||||
raw = router.complete(
|
||||
user_prompt,
|
||||
system=_VISION_SYSTEM_PROMPT,
|
||||
images=[image_b64],
|
||||
max_tokens=128,
|
||||
)
|
||||
cforch_url = os.getenv("CF_ORCH_URL")
|
||||
if cforch_url:
|
||||
raw = _assess_via_orch(cforch_url, image_data_url, user_prompt)
|
||||
else:
|
||||
raw = _assess_via_local_llm(image_b64, user_prompt)
|
||||
|
||||
# Parse — be lenient: strip markdown fences if present
|
||||
try:
|
||||
cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
|
||||
analysis = json.loads(cleaned)
|
||||
|
|
@ -168,3 +163,54 @@ def _run_trust_photo_analysis(
|
|||
analysis.get("visible_damage"),
|
||||
analysis.get("confidence"),
|
||||
)
|
||||
|
||||
|
||||
def _assess_via_orch(cforch_url: str, image_data_url: str, user_prompt: str) -> str:
|
||||
"""Run photo assessment via cf-orch task endpoint (cloud path)."""
|
||||
from circuitforge_orch.client import CFOrchClient, TaskNotFound
|
||||
|
||||
client = CFOrchClient(cforch_url)
|
||||
try:
|
||||
with client.task_allocate("snipe", "image_assessment") as alloc:
|
||||
resp = httpx.post(
|
||||
f"{alloc.url}/v1/chat/completions",
|
||||
json={
|
||||
"model": alloc.model or "__auto__",
|
||||
"messages": [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _VISION_SYSTEM_PROMPT,
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "image_url", "image_url": {"url": image_data_url}},
|
||||
{"type": "text", "text": user_prompt},
|
||||
],
|
||||
},
|
||||
],
|
||||
"max_tokens": 128,
|
||||
},
|
||||
timeout=60.0,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()["choices"][0]["message"]["content"]
|
||||
except TaskNotFound:
|
||||
log.warning(
|
||||
"snipe.image_assessment not registered in cf-orch — falling back to local LLM"
|
||||
)
|
||||
image_b64 = image_data_url.split(",", 1)[1]
|
||||
return _assess_via_local_llm(image_b64, user_prompt)
|
||||
|
||||
|
||||
def _assess_via_local_llm(image_b64: str, user_prompt: str) -> str:
|
||||
"""Run photo assessment via local LLMRouter (local/self-hosted path)."""
|
||||
from app.llm.router import LLMRouter
|
||||
|
||||
router = LLMRouter()
|
||||
return router.complete(
|
||||
user_prompt,
|
||||
system=_VISION_SYSTEM_PROMPT,
|
||||
images=[image_b64],
|
||||
max_tokens=128,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ dependencies = [
|
|||
"playwright-stealth>=1.0",
|
||||
"cryptography>=42.0",
|
||||
"PyJWT>=2.8",
|
||||
"httpx>=0.27",
|
||||
"circuitforge-orch>=0.1.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
|
@ -30,7 +32,6 @@ dev = [
|
|||
"pytest>=8.0",
|
||||
"pytest-cov>=5.0",
|
||||
"ruff>=0.4",
|
||||
"httpx>=0.27", # FastAPI test client
|
||||
]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
"""Unit tests for QueryTranslator — LLMRouter mocked at boundary."""
|
||||
"""Unit tests for QueryTranslator — LLMRouter and cf-orch backends mocked at boundary."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
|
@ -73,7 +73,7 @@ def test_parse_response_missing_required_field():
|
|||
_parse_response(raw)
|
||||
|
||||
|
||||
# ── QueryTranslator (integration with mocked LLMRouter) ──────────────────────
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||
|
||||
from app.platforms.ebay.categories import EbayCategoryCache
|
||||
from circuitforge_core.db import get_connection, run_migrations
|
||||
|
|
@ -88,7 +88,22 @@ def db_with_categories(tmp_path):
|
|||
return conn
|
||||
|
||||
|
||||
def _make_translator(db_conn, llm_response: str) -> QueryTranslator:
|
||||
_VALID_LLM_RESPONSE = json.dumps({
|
||||
"base_query": "RTX 3080",
|
||||
"must_include_mode": "groups",
|
||||
"must_include": "rtx|geforce, 3080",
|
||||
"must_exclude": "mining,for parts",
|
||||
"max_price": 300.0,
|
||||
"min_price": None,
|
||||
"condition": ["used"],
|
||||
"category_id": "27386",
|
||||
"explanation": "Searching for used RTX 3080 GPUs under $300.",
|
||||
})
|
||||
|
||||
|
||||
# ── Local LLMRouter backend ───────────────────────────────────────────────────
|
||||
|
||||
def _make_local_translator(db_conn, llm_response: str) -> QueryTranslator:
|
||||
from app.platforms.ebay.categories import EbayCategoryCache
|
||||
cache = EbayCategoryCache(db_conn)
|
||||
mock_router = MagicMock()
|
||||
|
|
@ -97,18 +112,7 @@ def _make_translator(db_conn, llm_response: str) -> QueryTranslator:
|
|||
|
||||
|
||||
def test_translate_returns_search_params(db_with_categories):
|
||||
llm_out = json.dumps({
|
||||
"base_query": "RTX 3080",
|
||||
"must_include_mode": "groups",
|
||||
"must_include": "rtx|geforce, 3080",
|
||||
"must_exclude": "mining,for parts",
|
||||
"max_price": 300.0,
|
||||
"min_price": None,
|
||||
"condition": ["used"],
|
||||
"category_id": "27386",
|
||||
"explanation": "Searching for used RTX 3080 GPUs under $300.",
|
||||
})
|
||||
t = _make_translator(db_with_categories, llm_out)
|
||||
t = _make_local_translator(db_with_categories, _VALID_LLM_RESPONSE)
|
||||
result = t.translate("used RTX 3080 under $300 no mining")
|
||||
assert result.base_query == "RTX 3080"
|
||||
assert result.max_price == 300.0
|
||||
|
|
@ -116,18 +120,7 @@ def test_translate_returns_search_params(db_with_categories):
|
|||
|
||||
def test_translate_injects_category_hints(db_with_categories):
|
||||
"""The system prompt sent to the LLM must contain category_id hints."""
|
||||
llm_out = json.dumps({
|
||||
"base_query": "GPU",
|
||||
"must_include_mode": "all",
|
||||
"must_include": "",
|
||||
"must_exclude": "",
|
||||
"max_price": None,
|
||||
"min_price": None,
|
||||
"condition": [],
|
||||
"category_id": None,
|
||||
"explanation": "Searching for GPUs.",
|
||||
})
|
||||
t = _make_translator(db_with_categories, llm_out)
|
||||
t = _make_local_translator(db_with_categories, _VALID_LLM_RESPONSE)
|
||||
t.translate("GPU")
|
||||
call_args = t._llm_router.complete.call_args
|
||||
system_prompt = call_args.kwargs.get("system") or call_args.args[1]
|
||||
|
|
@ -141,7 +134,7 @@ def test_translate_empty_category_cache_still_works(tmp_path):
|
|||
conn = get_connection(tmp_path / "empty.db")
|
||||
run_migrations(conn, Path("app/db/migrations"))
|
||||
# Do NOT seed bootstrap — empty cache
|
||||
llm_out = json.dumps({
|
||||
t = _make_local_translator(conn, json.dumps({
|
||||
"base_query": "vinyl",
|
||||
"must_include_mode": "all",
|
||||
"must_include": "",
|
||||
|
|
@ -151,8 +144,7 @@ def test_translate_empty_category_cache_still_works(tmp_path):
|
|||
"condition": [],
|
||||
"category_id": None,
|
||||
"explanation": "Searching for vinyl records.",
|
||||
})
|
||||
t = _make_translator(conn, llm_out)
|
||||
}))
|
||||
result = t.translate("vinyl records")
|
||||
assert result.base_query == "vinyl"
|
||||
call_args = t._llm_router.complete.call_args
|
||||
|
|
@ -168,3 +160,101 @@ def test_translate_llm_error_raises_query_translator_error(db_with_categories):
|
|||
t = QueryTranslator(category_cache=cache, llm_router=mock_router)
|
||||
with pytest.raises(QueryTranslatorError, match="LLM backend"):
|
||||
t.translate("used GPU")
|
||||
|
||||
|
||||
# ── cf-orch backend ───────────────────────────────────────────────────────────
|
||||
|
||||
def _make_orch_translator(db_conn) -> QueryTranslator:
|
||||
from app.platforms.ebay.categories import EbayCategoryCache
|
||||
cache = EbayCategoryCache(db_conn)
|
||||
return QueryTranslator(category_cache=cache, cforch_url="http://orch.local:8700")
|
||||
|
||||
|
||||
def _mock_alloc_response() -> MagicMock:
|
||||
resp = MagicMock()
|
||||
resp.json.return_value = {
|
||||
"url": "http://cf-text.local:11434",
|
||||
"allocation_id": "alloc-abc123",
|
||||
"node_id": "heimdall",
|
||||
}
|
||||
resp.raise_for_status.return_value = None
|
||||
return resp
|
||||
|
||||
|
||||
def _mock_chat_response(content: str) -> MagicMock:
|
||||
resp = MagicMock()
|
||||
resp.json.return_value = {
|
||||
"choices": [{"message": {"content": content}}]
|
||||
}
|
||||
resp.raise_for_status.return_value = None
|
||||
return resp
|
||||
|
||||
|
||||
def _mock_delete_response() -> MagicMock:
|
||||
resp = MagicMock()
|
||||
resp.raise_for_status.return_value = None
|
||||
return resp
|
||||
|
||||
|
||||
def test_orch_translate_returns_search_params(db_with_categories):
|
||||
t = _make_orch_translator(db_with_categories)
|
||||
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
|
||||
mock_post.side_effect = [
|
||||
_mock_alloc_response(),
|
||||
_mock_chat_response(_VALID_LLM_RESPONSE),
|
||||
]
|
||||
mock_delete.return_value = _mock_delete_response()
|
||||
result = t.translate("used RTX 3080 under $300")
|
||||
assert result.base_query == "RTX 3080"
|
||||
assert result.max_price == 300.0
|
||||
|
||||
|
||||
def test_orch_allocates_with_correct_task_tag(db_with_categories):
|
||||
t = _make_orch_translator(db_with_categories)
|
||||
with patch("httpx.post") as mock_post, patch("httpx.delete"):
|
||||
mock_post.side_effect = [
|
||||
_mock_alloc_response(),
|
||||
_mock_chat_response(_VALID_LLM_RESPONSE),
|
||||
]
|
||||
t.translate("GPU")
|
||||
alloc_call = mock_post.call_args_list[0]
|
||||
assert alloc_call.args[0] == "http://orch.local:8700/api/inference/task"
|
||||
body = alloc_call.kwargs.get("json") or alloc_call.args[1]
|
||||
assert body == {"product": "snipe", "task": "query_translation"}
|
||||
|
||||
|
||||
def test_orch_releases_allocation_after_success(db_with_categories):
|
||||
t = _make_orch_translator(db_with_categories)
|
||||
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
|
||||
mock_post.side_effect = [
|
||||
_mock_alloc_response(),
|
||||
_mock_chat_response(_VALID_LLM_RESPONSE),
|
||||
]
|
||||
mock_delete.return_value = _mock_delete_response()
|
||||
t.translate("GPU")
|
||||
mock_delete.assert_called_once()
|
||||
delete_url = mock_delete.call_args.args[0]
|
||||
assert "alloc-abc123" in delete_url
|
||||
|
||||
|
||||
def test_orch_releases_allocation_on_inference_failure(db_with_categories):
|
||||
"""Allocation must be released even when the inference call fails."""
|
||||
t = _make_orch_translator(db_with_categories)
|
||||
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
|
||||
mock_post.side_effect = [
|
||||
_mock_alloc_response(),
|
||||
Exception("inference timeout"),
|
||||
]
|
||||
mock_delete.return_value = _mock_delete_response()
|
||||
with pytest.raises(QueryTranslatorError, match="LLM backend"):
|
||||
t.translate("GPU")
|
||||
mock_delete.assert_called_once()
|
||||
|
||||
|
||||
def test_init_requires_at_least_one_backend(tmp_path):
|
||||
from circuitforge_core.db import get_connection, run_migrations
|
||||
conn = get_connection(tmp_path / "test.db")
|
||||
run_migrations(conn, Path("app/db/migrations"))
|
||||
cache = EbayCategoryCache(conn)
|
||||
with pytest.raises(ValueError, match="cforch_url or llm_router"):
|
||||
QueryTranslator(category_cache=cache)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from __future__ import annotations
|
|||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -47,6 +47,19 @@ def tmp_db(tmp_path: Path) -> Path:
|
|||
return db
|
||||
|
||||
|
||||
_VISION_JSON = json.dumps({
|
||||
"is_stock_photo": False,
|
||||
"visible_damage": False,
|
||||
"authenticity_signal": "genuine_product_photo",
|
||||
"confidence": "high",
|
||||
})
|
||||
|
||||
_PARAMS = json.dumps({
|
||||
"photo_url": "https://example.com/photo.jpg",
|
||||
"listing_title": "Used iPhone 13",
|
||||
})
|
||||
|
||||
|
||||
def test_llm_task_types_defined():
|
||||
assert "trust_photo_analysis" in LLM_TASK_TYPES
|
||||
|
||||
|
|
@ -75,29 +88,17 @@ def test_insert_task_dedup(tmp_db: Path):
|
|||
assert new2 is False
|
||||
|
||||
|
||||
def test_run_task_photo_analysis_success(tmp_db: Path):
|
||||
"""Vision analysis result is written to trust_scores.photo_analysis_json."""
|
||||
params = json.dumps({
|
||||
"listing_id": 1,
|
||||
"photo_url": "https://example.com/photo.jpg",
|
||||
"listing_title": "Used iPhone 13",
|
||||
})
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
|
||||
# ── Local LLMRouter path ──────────────────────────────────────────────────────
|
||||
|
||||
vision_result = {
|
||||
"is_stock_photo": False,
|
||||
"visible_damage": False,
|
||||
"authenticity_signal": "genuine_product_photo",
|
||||
"confidence": "high",
|
||||
}
|
||||
def test_run_task_photo_analysis_local_success(tmp_db: Path):
|
||||
"""Local path: vision result is written to trust_scores.photo_analysis_json."""
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch("app.tasks.runner.LLMRouter") as MockRouter:
|
||||
patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON):
|
||||
mock_req.get.return_value.content = b"fake_image_bytes"
|
||||
mock_req.get.return_value.raise_for_status = lambda: None
|
||||
instance = MockRouter.return_value
|
||||
instance.complete.return_value = json.dumps(vision_result)
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
|
||||
|
||||
conn = sqlite3.connect(tmp_db)
|
||||
score_row = conn.execute(
|
||||
|
|
@ -110,20 +111,16 @@ def test_run_task_photo_analysis_success(tmp_db: Path):
|
|||
assert task_row[0] == "completed"
|
||||
parsed = json.loads(score_row[0])
|
||||
assert parsed["is_stock_photo"] is False
|
||||
assert parsed["confidence"] == "high"
|
||||
|
||||
|
||||
def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path):
|
||||
"""If photo download fails, task is marked failed without crashing."""
|
||||
params = json.dumps({
|
||||
"listing_id": 1,
|
||||
"photo_url": "https://example.com/bad.jpg",
|
||||
"listing_title": "Laptop",
|
||||
})
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req:
|
||||
mock_req.get.side_effect = ConnectionError("fetch failed")
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
|
||||
|
||||
conn = sqlite3.connect(tmp_db)
|
||||
row = conn.execute(
|
||||
|
|
@ -156,3 +153,169 @@ def test_run_task_unknown_type_marks_failed(tmp_db: Path):
|
|||
).fetchone()
|
||||
conn.close()
|
||||
assert row[0] == "failed"
|
||||
|
||||
|
||||
# ── cf-orch path ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_orch_client_mock(vision_json: str) -> MagicMock:
|
||||
"""Build a CFOrchClient mock whose task_allocate context manager returns an Allocation."""
|
||||
alloc = MagicMock()
|
||||
alloc.url = "http://cf-vlm.local:8000"
|
||||
alloc.model = "bartowski--qwen2-vl-7b-instruct-gguf"
|
||||
|
||||
cm = MagicMock()
|
||||
cm.__enter__ = MagicMock(return_value=alloc)
|
||||
cm.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
client = MagicMock()
|
||||
client.task_allocate.return_value = cm
|
||||
return client
|
||||
|
||||
|
||||
def test_run_task_photo_analysis_orch_success(tmp_db: Path):
|
||||
"""Cloud path: CFOrchClient.task_allocate is used when CF_ORCH_URL is set."""
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
|
||||
|
||||
chat_resp = MagicMock()
|
||||
chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
|
||||
chat_resp.raise_for_status = MagicMock()
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
|
||||
patch("app.tasks.runner.httpx") as mock_httpx, \
|
||||
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
|
||||
|
||||
mock_req.get.return_value.content = b"fake_image_bytes"
|
||||
mock_req.get.return_value.raise_for_status = lambda: None
|
||||
mock_httpx.post.return_value = chat_resp
|
||||
|
||||
client_instance = _make_orch_client_mock(_VISION_JSON)
|
||||
MockClient.return_value = client_instance
|
||||
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
|
||||
|
||||
conn = sqlite3.connect(tmp_db)
|
||||
score_row = conn.execute(
|
||||
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
|
||||
).fetchone()
|
||||
task_row = conn.execute(
|
||||
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
assert task_row[0] == "completed"
|
||||
parsed = json.loads(score_row[0])
|
||||
assert parsed["authenticity_signal"] == "genuine_product_photo"
|
||||
|
||||
|
||||
def test_run_task_photo_analysis_orch_uses_image_assessment_task(tmp_db: Path):
|
||||
"""task_allocate must be called with product='snipe', task='image_assessment'."""
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
|
||||
|
||||
chat_resp = MagicMock()
|
||||
chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
|
||||
chat_resp.raise_for_status = MagicMock()
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
|
||||
patch("app.tasks.runner.httpx") as mock_httpx, \
|
||||
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
|
||||
|
||||
mock_req.get.return_value.content = b"fake_image_bytes"
|
||||
mock_req.get.return_value.raise_for_status = lambda: None
|
||||
mock_httpx.post.return_value = chat_resp
|
||||
|
||||
client_instance = _make_orch_client_mock(_VISION_JSON)
|
||||
MockClient.return_value = client_instance
|
||||
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
|
||||
|
||||
client_instance.task_allocate.assert_called_once_with("snipe", "image_assessment")
|
||||
|
||||
|
||||
def test_run_task_photo_analysis_orch_sends_image_url_content(tmp_db: Path):
|
||||
"""Vision payload must include image_url content block with data URI."""
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
|
||||
|
||||
captured_body: dict = {}
|
||||
|
||||
def capture_post(url, **kwargs):
|
||||
nonlocal captured_body
|
||||
if "/v1/chat/completions" in url:
|
||||
captured_body = kwargs.get("json", {})
|
||||
resp = MagicMock()
|
||||
resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
|
||||
resp.raise_for_status = MagicMock()
|
||||
return resp
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
|
||||
patch("app.tasks.runner.httpx") as mock_httpx, \
|
||||
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
|
||||
|
||||
mock_req.get.return_value.content = b"fake_image_bytes"
|
||||
mock_req.get.return_value.raise_for_status = lambda: None
|
||||
mock_httpx.post.side_effect = capture_post
|
||||
|
||||
client_instance = _make_orch_client_mock(_VISION_JSON)
|
||||
MockClient.return_value = client_instance
|
||||
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
|
||||
|
||||
user_content = captured_body["messages"][1]["content"]
|
||||
image_blocks = [b for b in user_content if b.get("type") == "image_url"]
|
||||
assert image_blocks, "No image_url content block found in vision payload"
|
||||
url = image_blocks[0]["image_url"]["url"]
|
||||
assert url.startswith("data:image/jpeg;base64,"), f"Unexpected image URL format: {url[:40]}"
|
||||
|
||||
|
||||
def test_run_task_photo_analysis_orch_task_not_found_falls_back(tmp_db: Path):
|
||||
"""TaskNotFound from cf-orch → graceful fallback to local LLMRouter."""
|
||||
from circuitforge_orch.client import TaskNotFound
|
||||
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
|
||||
|
||||
cm = MagicMock()
|
||||
cm.__enter__ = MagicMock(side_effect=TaskNotFound("snipe", "image_assessment"))
|
||||
cm.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
client_instance = MagicMock()
|
||||
client_instance.task_allocate.return_value = cm
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
|
||||
patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \
|
||||
patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local:
|
||||
|
||||
mock_req.get.return_value.content = b"fake_image_bytes"
|
||||
mock_req.get.return_value.raise_for_status = lambda: None
|
||||
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
|
||||
|
||||
mock_local.assert_called_once()
|
||||
|
||||
conn = sqlite3.connect(tmp_db)
|
||||
task_row = conn.execute(
|
||||
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
assert task_row[0] == "completed"
|
||||
|
||||
|
||||
def test_run_task_photo_analysis_non_json_response_writes_raw(tmp_db: Path):
|
||||
"""Non-JSON LLM response is stored with parse_error flag rather than crashing."""
|
||||
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
|
||||
|
||||
with patch("app.tasks.runner.requests") as mock_req, \
|
||||
patch("app.tasks.runner._assess_via_local_llm", return_value="not valid json at all"):
|
||||
mock_req.get.return_value.content = b"fake_image_bytes"
|
||||
mock_req.get.return_value.raise_for_status = lambda: None
|
||||
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
|
||||
|
||||
conn = sqlite3.connect(tmp_db)
|
||||
score_row = conn.execute(
|
||||
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
|
||||
).fetchone()
|
||||
conn.close()
|
||||
parsed = json.loads(score_row[0])
|
||||
assert parsed.get("parse_error") is True
|
||||
assert "raw_response" in parsed
|
||||
|
|
|
|||
Loading…
Reference in a new issue