Compare commits

...

3 commits

Author SHA1 Message Date
4dd44fdafb docs: bump version badge to match latest Forgejo release
Some checks failed
CI / Frontend typecheck + tests (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
CI / Python tests (push) Has been cancelled
2026-05-17 11:19:13 -07:00
263c8522ee feat(tasks): migrate trust_photo_analysis to cf-orch image_assessment task endpoint (#43)
- _assess_via_orch(): uses CFOrchClient.task_allocate('snipe', 'image_assessment')
  with multimodal image_url payload; falls back to local LLMRouter on TaskNotFound
- _assess_via_local_llm(): lazy LLMRouter import, unchanged local path
- CF_ORCH_URL env var selects path at runtime; local users unaffected
- Added circuitforge-orch>=0.1.0 to main dependencies
- 5 new runner tests covering orch happy path, task tag, image_url payload format,
  TaskNotFound fallback, and non-JSON response handling (13 tests total, 244 suite)
2026-05-13 15:43:18 -07:00
1bf95bba2a feat(llm): migrate query_translator to cf-orch task endpoint for cloud, keep LLMRouter for local (#54)
QueryTranslator now supports two backends chosen at startup:
- CF_ORCH_URL set: allocate via POST /api/inference/task (product=snipe,
  task=query_translation), call the allocated cf-text service, release the
  slot in a finally block to guarantee the VRAM lease is freed.
- CF_ORCH_URL absent: existing LLMRouter path unchanged (ollama/vllm/api keys).

Also moves httpx from dev-only to main dependencies (already used by mcp/server.py).
2026-05-13 15:22:09 -07:00
7 changed files with 470 additions and 97 deletions

View file

@ -8,6 +8,7 @@
[![License: MIT / BSL 1.1](https://img.shields.io/badge/license-MIT%20%2F%20BSL%201.1-blue)](LICENSE) [![License: MIT / BSL 1.1](https://img.shields.io/badge/license-MIT%20%2F%20BSL%201.1-blue)](LICENSE)
[![Status: Beta](https://img.shields.io/badge/status-beta-yellow)]() [![Status: Beta](https://img.shields.io/badge/status-beta-yellow)]()
[![Version](https://img.shields.io/badge/version-0.5.1-green)](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe/releases)
[![Forgejo](https://img.shields.io/badge/primary%20repo-Forgejo-orange)](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe) [![Forgejo](https://img.shields.io/badge/primary%20repo-Forgejo-orange)](https://git.opensourcesolarpunk.com/Circuit-Forge/snipe)
[![Docs](https://img.shields.io/badge/docs-docs.circuitforge.tech%2Fsnipe-green)](https://docs.circuitforge.tech/snipe) [![Docs](https://img.shields.io/badge/docs-docs.circuitforge.tech%2Fsnipe-green)](https://docs.circuitforge.tech/snipe)

View file

@ -209,13 +209,21 @@ async def _lifespan(app: FastAPI):
_category_cache.refresh(token_manager=None) # bootstrap fallback _category_cache.refresh(token_manager=None) # bootstrap fallback
try: try:
from app.llm.router import LLMRouter cforch_url = os.getenv("CF_ORCH_URL") or None
_llm_router = LLMRouter() if cforch_url:
_query_translator = QueryTranslator( _query_translator = QueryTranslator(
category_cache=_category_cache, category_cache=_category_cache,
llm_router=_llm_router, cforch_url=cforch_url,
) )
log.info("LLM query builder ready.") log.info("LLM query builder ready (cf-orch).")
else:
from app.llm.router import LLMRouter
_llm_router = LLMRouter()
_query_translator = QueryTranslator(
category_cache=_category_cache,
llm_router=_llm_router,
)
log.info("LLM query builder ready (local LLM).")
except Exception: except Exception:
log.info("No LLM backend configured — query builder disabled.") log.info("No LLM backend configured — query builder disabled.")
except Exception: except Exception:
@ -2005,7 +2013,7 @@ async def build_search_query(
if translator is None: if translator is None:
raise HTTPException( raise HTTPException(
status_code=503, status_code=503,
detail="No LLM backend configured. Set OLLAMA_HOST, ANTHROPIC_API_KEY, or OPENAI_API_KEY.", detail="No LLM backend configured. Set CF_ORCH_URL (cloud) or OLLAMA_HOST / ANTHROPIC_API_KEY / OPENAI_API_KEY (local).",
) )
from app.llm.query_translator import QueryTranslatorError from app.llm.query_translator import QueryTranslatorError

View file

@ -2,9 +2,15 @@
# BSL 1.1 License # BSL 1.1 License
"""LLM query builder — translates natural language to eBay SearchFilters. """LLM query builder — translates natural language to eBay SearchFilters.
The QueryTranslator calls LLMRouter.complete() (synchronous) with a domain-aware Supports two backends, selected at construction time:
system prompt. The prompt includes category hints injected from EbayCategoryCache.
The LLM returns a single JSON object matching SearchParamsResponse. cforch_url cf-orch task endpoint (cloud/premium). The coordinator resolves
product+task to a model and returns an allocation. The caller
POSTs to the allocated service URL, then DELETEs the allocation.
llm_router circuitforge_core.LLMRouter (local installs: ollama/vllm/api keys).
Exactly one of cforch_url or llm_router must be supplied.
""" """
from __future__ import annotations from __future__ import annotations
@ -13,6 +19,8 @@ import logging
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional
import httpx
if TYPE_CHECKING: if TYPE_CHECKING:
from app.platforms.ebay.categories import EbayCategoryCache from app.platforms.ebay.categories import EbayCategoryCache
@ -128,11 +136,23 @@ class QueryTranslator:
Args: Args:
category_cache: An EbayCategoryCache instance (may have empty cache). category_cache: An EbayCategoryCache instance (may have empty cache).
llm_router: An LLMRouter instance from circuitforge_core. cforch_url: cf-orch coordinator base URL (cloud/premium path).
llm_router: A circuitforge_core LLMRouter instance (local path).
Exactly one of cforch_url or llm_router must be provided.
""" """
def __init__(self, category_cache: "EbayCategoryCache", llm_router: object) -> None: def __init__(
self,
category_cache: "EbayCategoryCache",
*,
cforch_url: str | None = None,
llm_router: object | None = None,
) -> None:
if cforch_url is None and llm_router is None:
raise ValueError("Either cforch_url or llm_router must be provided")
self._cache = category_cache self._cache = category_cache
self._cforch_url = cforch_url
self._llm_router = llm_router self._llm_router = llm_router
def translate(self, natural_language: str) -> SearchParamsResponse: def translate(self, natural_language: str) -> SearchParamsResponse:
@ -154,14 +174,58 @@ class QueryTranslator:
system_prompt = _SYSTEM_PROMPT_TEMPLATE.format(category_hints=category_hints) system_prompt = _SYSTEM_PROMPT_TEMPLATE.format(category_hints=category_hints)
try: try:
raw = self._llm_router.complete( if self._cforch_url:
natural_language, raw = self._call_orch(system_prompt, natural_language)
system=system_prompt, else:
max_tokens=512, raw = self._call_local(system_prompt, natural_language)
) except QueryTranslatorError:
raise
except Exception as exc: except Exception as exc:
raise QueryTranslatorError( raise QueryTranslatorError(
f"LLM backend error: {exc}", raw="" f"LLM backend error: {exc}", raw=""
) from exc ) from exc
return _parse_response(raw) return _parse_response(raw)
def _call_orch(self, system_prompt: str, user_message: str) -> str:
"""Allocate via cf-orch task endpoint, call the model, release the slot."""
alloc_resp = httpx.post(
f"{self._cforch_url}/api/inference/task",
json={"product": "snipe", "task": "query_translation"},
timeout=10.0,
)
alloc_resp.raise_for_status()
alloc = alloc_resp.json()
service_url = alloc["url"]
allocation_id = alloc["allocation_id"]
try:
resp = httpx.post(
f"{service_url}/v1/chat/completions",
json={
"model": "__auto__",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"max_tokens": 512,
},
timeout=60.0,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
finally:
try:
httpx.delete(
f"{self._cforch_url}/api/services/cf-text/allocations/{allocation_id}",
timeout=5.0,
)
except Exception:
log.warning("Failed to release cf-orch allocation %s", allocation_id)
def _call_local(self, system_prompt: str, user_message: str) -> str:
"""Call the locally-configured LLMRouter (ollama/vllm/api keys)."""
return self._llm_router.complete( # type: ignore[union-attr]
user_message,
system=system_prompt,
max_tokens=512,
)

View file

@ -7,28 +7,30 @@ Current task types:
trust_photo_analysis download primary photo, run vision LLM, write trust_photo_analysis download primary photo, run vision LLM, write
result to trust_scores.photo_analysis_json (Paid tier). result to trust_scores.photo_analysis_json (Paid tier).
Prompt note: The vision prompt is a functional first pass. Tune against real Image assessment routing:
eBay listings before GA specifically stock-photo vs genuine-product distinction Cloud (CF_ORCH_URL set): allocates via cf-orch task endpoint
and the damage vocabulary. product=snipe, task=image_assessment.
Local (no CF_ORCH_URL) or TaskNotFound fallback: uses LLMRouter
with a vision-capable local backend (moondream2, llava, etc.).
""" """
from __future__ import annotations from __future__ import annotations
import base64 import base64
import json import json
import logging import logging
import os
from pathlib import Path from pathlib import Path
import httpx
import requests import requests
from circuitforge_core.db import get_connection from circuitforge_core.db import get_connection
from circuitforge_core.llm import LLMRouter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"}) LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"})
VRAM_BUDGETS: dict[str, float] = { VRAM_BUDGETS: dict[str, float] = {
# moondream2 / vision-capable LLM — single image, short response "trust_photo_analysis": 6000, # Q5_K_M Qwen2-VL via cf-orch; LLMRouter fallback uses 2.0 GB
"trust_photo_analysis": 2.0,
} }
_VISION_SYSTEM_PROMPT = ( _VISION_SYSTEM_PROMPT = (
@ -51,8 +53,7 @@ def insert_task(
) -> tuple[int, bool]: ) -> tuple[int, bool]:
"""Insert a background task if no identical task is already in-flight. """Insert a background task if no identical task is already in-flight.
Uses get_connection() so WAL mode and timeout=30 apply same as all other Returns (task_id, is_new).
Snipe DB access. Returns (task_id, is_new).
""" """
conn = get_connection(db_path) conn = get_connection(db_path)
conn.row_factory = __import__("sqlite3").Row conn.row_factory = __import__("sqlite3").Row
@ -120,32 +121,26 @@ def _run_trust_photo_analysis(
p = json.loads(params or "{}") p = json.loads(params or "{}")
photo_url = p.get("photo_url", "") photo_url = p.get("photo_url", "")
listing_title = p.get("listing_title", "") listing_title = p.get("listing_title", "")
# user_db: per-user DB in cloud mode; same as db_path in local mode.
result_db = Path(p.get("user_db", str(db_path))) result_db = Path(p.get("user_db", str(db_path)))
if not photo_url: if not photo_url:
raise ValueError("trust_photo_analysis: 'photo_url' is required in params") raise ValueError("trust_photo_analysis: 'photo_url' is required in params")
# Download and base64-encode the photo
resp = requests.get(photo_url, timeout=10) resp = requests.get(photo_url, timeout=10)
resp.raise_for_status() resp.raise_for_status()
image_b64 = base64.b64encode(resp.content).decode() image_b64 = base64.b64encode(resp.content).decode()
image_data_url = f"data:image/jpeg;base64,{image_b64}"
# Build user prompt with optional title context user_prompt = "Assess this listing image."
user_prompt = "Evaluate this eBay listing photo."
if listing_title: if listing_title:
user_prompt = f"Evaluate this eBay listing photo for: {listing_title}" user_prompt = f"Assess this eBay listing image: {listing_title}"
# Call LLMRouter with vision capability cforch_url = os.getenv("CF_ORCH_URL")
router = LLMRouter() if cforch_url:
raw = router.complete( raw = _assess_via_orch(cforch_url, image_data_url, user_prompt)
user_prompt, else:
system=_VISION_SYSTEM_PROMPT, raw = _assess_via_local_llm(image_b64, user_prompt)
images=[image_b64],
max_tokens=128,
)
# Parse — be lenient: strip markdown fences if present
try: try:
cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
analysis = json.loads(cleaned) analysis = json.loads(cleaned)
@ -168,3 +163,54 @@ def _run_trust_photo_analysis(
analysis.get("visible_damage"), analysis.get("visible_damage"),
analysis.get("confidence"), analysis.get("confidence"),
) )
def _assess_via_orch(cforch_url: str, image_data_url: str, user_prompt: str) -> str:
"""Run photo assessment via cf-orch task endpoint (cloud path)."""
from circuitforge_orch.client import CFOrchClient, TaskNotFound
client = CFOrchClient(cforch_url)
try:
with client.task_allocate("snipe", "image_assessment") as alloc:
resp = httpx.post(
f"{alloc.url}/v1/chat/completions",
json={
"model": alloc.model or "__auto__",
"messages": [
{
"role": "system",
"content": _VISION_SYSTEM_PROMPT,
},
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_data_url}},
{"type": "text", "text": user_prompt},
],
},
],
"max_tokens": 128,
},
timeout=60.0,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except TaskNotFound:
log.warning(
"snipe.image_assessment not registered in cf-orch — falling back to local LLM"
)
image_b64 = image_data_url.split(",", 1)[1]
return _assess_via_local_llm(image_b64, user_prompt)
def _assess_via_local_llm(image_b64: str, user_prompt: str) -> str:
"""Run photo assessment via local LLMRouter (local/self-hosted path)."""
from app.llm.router import LLMRouter
router = LLMRouter()
return router.complete(
user_prompt,
system=_VISION_SYSTEM_PROMPT,
images=[image_b64],
max_tokens=128,
)

View file

@ -23,6 +23,8 @@ dependencies = [
"playwright-stealth>=1.0", "playwright-stealth>=1.0",
"cryptography>=42.0", "cryptography>=42.0",
"PyJWT>=2.8", "PyJWT>=2.8",
"httpx>=0.27",
"circuitforge-orch>=0.1.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]
@ -30,7 +32,6 @@ dev = [
"pytest>=8.0", "pytest>=8.0",
"pytest-cov>=5.0", "pytest-cov>=5.0",
"ruff>=0.4", "ruff>=0.4",
"httpx>=0.27", # FastAPI test client
] ]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]

View file

@ -1,4 +1,4 @@
"""Unit tests for QueryTranslator — LLMRouter mocked at boundary.""" """Unit tests for QueryTranslator — LLMRouter and cf-orch backends mocked at boundary."""
from __future__ import annotations from __future__ import annotations
import json import json
@ -73,7 +73,7 @@ def test_parse_response_missing_required_field():
_parse_response(raw) _parse_response(raw)
# ── QueryTranslator (integration with mocked LLMRouter) ────────────────────── # ── Fixtures ──────────────────────────────────────────────────────────────────
from app.platforms.ebay.categories import EbayCategoryCache from app.platforms.ebay.categories import EbayCategoryCache
from circuitforge_core.db import get_connection, run_migrations from circuitforge_core.db import get_connection, run_migrations
@ -88,7 +88,22 @@ def db_with_categories(tmp_path):
return conn return conn
def _make_translator(db_conn, llm_response: str) -> QueryTranslator: _VALID_LLM_RESPONSE = json.dumps({
"base_query": "RTX 3080",
"must_include_mode": "groups",
"must_include": "rtx|geforce, 3080",
"must_exclude": "mining,for parts",
"max_price": 300.0,
"min_price": None,
"condition": ["used"],
"category_id": "27386",
"explanation": "Searching for used RTX 3080 GPUs under $300.",
})
# ── Local LLMRouter backend ───────────────────────────────────────────────────
def _make_local_translator(db_conn, llm_response: str) -> QueryTranslator:
from app.platforms.ebay.categories import EbayCategoryCache from app.platforms.ebay.categories import EbayCategoryCache
cache = EbayCategoryCache(db_conn) cache = EbayCategoryCache(db_conn)
mock_router = MagicMock() mock_router = MagicMock()
@ -97,18 +112,7 @@ def _make_translator(db_conn, llm_response: str) -> QueryTranslator:
def test_translate_returns_search_params(db_with_categories): def test_translate_returns_search_params(db_with_categories):
llm_out = json.dumps({ t = _make_local_translator(db_with_categories, _VALID_LLM_RESPONSE)
"base_query": "RTX 3080",
"must_include_mode": "groups",
"must_include": "rtx|geforce, 3080",
"must_exclude": "mining,for parts",
"max_price": 300.0,
"min_price": None,
"condition": ["used"],
"category_id": "27386",
"explanation": "Searching for used RTX 3080 GPUs under $300.",
})
t = _make_translator(db_with_categories, llm_out)
result = t.translate("used RTX 3080 under $300 no mining") result = t.translate("used RTX 3080 under $300 no mining")
assert result.base_query == "RTX 3080" assert result.base_query == "RTX 3080"
assert result.max_price == 300.0 assert result.max_price == 300.0
@ -116,18 +120,7 @@ def test_translate_returns_search_params(db_with_categories):
def test_translate_injects_category_hints(db_with_categories): def test_translate_injects_category_hints(db_with_categories):
"""The system prompt sent to the LLM must contain category_id hints.""" """The system prompt sent to the LLM must contain category_id hints."""
llm_out = json.dumps({ t = _make_local_translator(db_with_categories, _VALID_LLM_RESPONSE)
"base_query": "GPU",
"must_include_mode": "all",
"must_include": "",
"must_exclude": "",
"max_price": None,
"min_price": None,
"condition": [],
"category_id": None,
"explanation": "Searching for GPUs.",
})
t = _make_translator(db_with_categories, llm_out)
t.translate("GPU") t.translate("GPU")
call_args = t._llm_router.complete.call_args call_args = t._llm_router.complete.call_args
system_prompt = call_args.kwargs.get("system") or call_args.args[1] system_prompt = call_args.kwargs.get("system") or call_args.args[1]
@ -141,7 +134,7 @@ def test_translate_empty_category_cache_still_works(tmp_path):
conn = get_connection(tmp_path / "empty.db") conn = get_connection(tmp_path / "empty.db")
run_migrations(conn, Path("app/db/migrations")) run_migrations(conn, Path("app/db/migrations"))
# Do NOT seed bootstrap — empty cache # Do NOT seed bootstrap — empty cache
llm_out = json.dumps({ t = _make_local_translator(conn, json.dumps({
"base_query": "vinyl", "base_query": "vinyl",
"must_include_mode": "all", "must_include_mode": "all",
"must_include": "", "must_include": "",
@ -151,8 +144,7 @@ def test_translate_empty_category_cache_still_works(tmp_path):
"condition": [], "condition": [],
"category_id": None, "category_id": None,
"explanation": "Searching for vinyl records.", "explanation": "Searching for vinyl records.",
}) }))
t = _make_translator(conn, llm_out)
result = t.translate("vinyl records") result = t.translate("vinyl records")
assert result.base_query == "vinyl" assert result.base_query == "vinyl"
call_args = t._llm_router.complete.call_args call_args = t._llm_router.complete.call_args
@ -168,3 +160,101 @@ def test_translate_llm_error_raises_query_translator_error(db_with_categories):
t = QueryTranslator(category_cache=cache, llm_router=mock_router) t = QueryTranslator(category_cache=cache, llm_router=mock_router)
with pytest.raises(QueryTranslatorError, match="LLM backend"): with pytest.raises(QueryTranslatorError, match="LLM backend"):
t.translate("used GPU") t.translate("used GPU")
# ── cf-orch backend ───────────────────────────────────────────────────────────
def _make_orch_translator(db_conn) -> QueryTranslator:
from app.platforms.ebay.categories import EbayCategoryCache
cache = EbayCategoryCache(db_conn)
return QueryTranslator(category_cache=cache, cforch_url="http://orch.local:8700")
def _mock_alloc_response() -> MagicMock:
resp = MagicMock()
resp.json.return_value = {
"url": "http://cf-text.local:11434",
"allocation_id": "alloc-abc123",
"node_id": "heimdall",
}
resp.raise_for_status.return_value = None
return resp
def _mock_chat_response(content: str) -> MagicMock:
resp = MagicMock()
resp.json.return_value = {
"choices": [{"message": {"content": content}}]
}
resp.raise_for_status.return_value = None
return resp
def _mock_delete_response() -> MagicMock:
resp = MagicMock()
resp.raise_for_status.return_value = None
return resp
def test_orch_translate_returns_search_params(db_with_categories):
t = _make_orch_translator(db_with_categories)
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
mock_post.side_effect = [
_mock_alloc_response(),
_mock_chat_response(_VALID_LLM_RESPONSE),
]
mock_delete.return_value = _mock_delete_response()
result = t.translate("used RTX 3080 under $300")
assert result.base_query == "RTX 3080"
assert result.max_price == 300.0
def test_orch_allocates_with_correct_task_tag(db_with_categories):
t = _make_orch_translator(db_with_categories)
with patch("httpx.post") as mock_post, patch("httpx.delete"):
mock_post.side_effect = [
_mock_alloc_response(),
_mock_chat_response(_VALID_LLM_RESPONSE),
]
t.translate("GPU")
alloc_call = mock_post.call_args_list[0]
assert alloc_call.args[0] == "http://orch.local:8700/api/inference/task"
body = alloc_call.kwargs.get("json") or alloc_call.args[1]
assert body == {"product": "snipe", "task": "query_translation"}
def test_orch_releases_allocation_after_success(db_with_categories):
t = _make_orch_translator(db_with_categories)
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
mock_post.side_effect = [
_mock_alloc_response(),
_mock_chat_response(_VALID_LLM_RESPONSE),
]
mock_delete.return_value = _mock_delete_response()
t.translate("GPU")
mock_delete.assert_called_once()
delete_url = mock_delete.call_args.args[0]
assert "alloc-abc123" in delete_url
def test_orch_releases_allocation_on_inference_failure(db_with_categories):
"""Allocation must be released even when the inference call fails."""
t = _make_orch_translator(db_with_categories)
with patch("httpx.post") as mock_post, patch("httpx.delete") as mock_delete:
mock_post.side_effect = [
_mock_alloc_response(),
Exception("inference timeout"),
]
mock_delete.return_value = _mock_delete_response()
with pytest.raises(QueryTranslatorError, match="LLM backend"):
t.translate("GPU")
mock_delete.assert_called_once()
def test_init_requires_at_least_one_backend(tmp_path):
from circuitforge_core.db import get_connection, run_migrations
conn = get_connection(tmp_path / "test.db")
run_migrations(conn, Path("app/db/migrations"))
cache = EbayCategoryCache(conn)
with pytest.raises(ValueError, match="cforch_url or llm_router"):
QueryTranslator(category_cache=cache)

View file

@ -4,7 +4,7 @@ from __future__ import annotations
import json import json
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import MagicMock, patch, call
import pytest import pytest
@ -47,6 +47,19 @@ def tmp_db(tmp_path: Path) -> Path:
return db return db
_VISION_JSON = json.dumps({
"is_stock_photo": False,
"visible_damage": False,
"authenticity_signal": "genuine_product_photo",
"confidence": "high",
})
_PARAMS = json.dumps({
"photo_url": "https://example.com/photo.jpg",
"listing_title": "Used iPhone 13",
})
def test_llm_task_types_defined(): def test_llm_task_types_defined():
assert "trust_photo_analysis" in LLM_TASK_TYPES assert "trust_photo_analysis" in LLM_TASK_TYPES
@ -75,29 +88,17 @@ def test_insert_task_dedup(tmp_db: Path):
assert new2 is False assert new2 is False
def test_run_task_photo_analysis_success(tmp_db: Path): # ── Local LLMRouter path ──────────────────────────────────────────────────────
"""Vision analysis result is written to trust_scores.photo_analysis_json."""
params = json.dumps({
"listing_id": 1,
"photo_url": "https://example.com/photo.jpg",
"listing_title": "Used iPhone 13",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
vision_result = { def test_run_task_photo_analysis_local_success(tmp_db: Path):
"is_stock_photo": False, """Local path: vision result is written to trust_scores.photo_analysis_json."""
"visible_damage": False, task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
"authenticity_signal": "genuine_product_photo",
"confidence": "high",
}
with patch("app.tasks.runner.requests") as mock_req, \ with patch("app.tasks.runner.requests") as mock_req, \
patch("app.tasks.runner.LLMRouter") as MockRouter: patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON):
mock_req.get.return_value.content = b"fake_image_bytes" mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None mock_req.get.return_value.raise_for_status = lambda: None
instance = MockRouter.return_value run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
instance.complete.return_value = json.dumps(vision_result)
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db) conn = sqlite3.connect(tmp_db)
score_row = conn.execute( score_row = conn.execute(
@ -110,20 +111,16 @@ def test_run_task_photo_analysis_success(tmp_db: Path):
assert task_row[0] == "completed" assert task_row[0] == "completed"
parsed = json.loads(score_row[0]) parsed = json.loads(score_row[0])
assert parsed["is_stock_photo"] is False assert parsed["is_stock_photo"] is False
assert parsed["confidence"] == "high"
def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path): def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path):
"""If photo download fails, task is marked failed without crashing.""" """If photo download fails, task is marked failed without crashing."""
params = json.dumps({ task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
"listing_id": 1,
"photo_url": "https://example.com/bad.jpg",
"listing_title": "Laptop",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
with patch("app.tasks.runner.requests") as mock_req: with patch("app.tasks.runner.requests") as mock_req:
mock_req.get.side_effect = ConnectionError("fetch failed") mock_req.get.side_effect = ConnectionError("fetch failed")
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db) conn = sqlite3.connect(tmp_db)
row = conn.execute( row = conn.execute(
@ -156,3 +153,169 @@ def test_run_task_unknown_type_marks_failed(tmp_db: Path):
).fetchone() ).fetchone()
conn.close() conn.close()
assert row[0] == "failed" assert row[0] == "failed"
# ── cf-orch path ──────────────────────────────────────────────────────────────
def _make_orch_client_mock(vision_json: str) -> MagicMock:
"""Build a CFOrchClient mock whose task_allocate context manager returns an Allocation."""
alloc = MagicMock()
alloc.url = "http://cf-vlm.local:8000"
alloc.model = "bartowski--qwen2-vl-7b-instruct-gguf"
cm = MagicMock()
cm.__enter__ = MagicMock(return_value=alloc)
cm.__exit__ = MagicMock(return_value=False)
client = MagicMock()
client.task_allocate.return_value = cm
return client
def test_run_task_photo_analysis_orch_success(tmp_db: Path):
"""Cloud path: CFOrchClient.task_allocate is used when CF_ORCH_URL is set."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
chat_resp = MagicMock()
chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.return_value = chat_resp
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db)
score_row = conn.execute(
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
).fetchone()
task_row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert task_row[0] == "completed"
parsed = json.loads(score_row[0])
assert parsed["authenticity_signal"] == "genuine_product_photo"
def test_run_task_photo_analysis_orch_uses_image_assessment_task(tmp_db: Path):
"""task_allocate must be called with product='snipe', task='image_assessment'."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
chat_resp = MagicMock()
chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.return_value = chat_resp
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
client_instance.task_allocate.assert_called_once_with("snipe", "image_assessment")
def test_run_task_photo_analysis_orch_sends_image_url_content(tmp_db: Path):
"""Vision payload must include image_url content block with data URI."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
captured_body: dict = {}
def capture_post(url, **kwargs):
nonlocal captured_body
if "/v1/chat/completions" in url:
captured_body = kwargs.get("json", {})
resp = MagicMock()
resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
resp.raise_for_status = MagicMock()
return resp
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.side_effect = capture_post
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
user_content = captured_body["messages"][1]["content"]
image_blocks = [b for b in user_content if b.get("type") == "image_url"]
assert image_blocks, "No image_url content block found in vision payload"
url = image_blocks[0]["image_url"]["url"]
assert url.startswith("data:image/jpeg;base64,"), f"Unexpected image URL format: {url[:40]}"
def test_run_task_photo_analysis_orch_task_not_found_falls_back(tmp_db: Path):
"""TaskNotFound from cf-orch → graceful fallback to local LLMRouter."""
from circuitforge_orch.client import TaskNotFound
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
cm = MagicMock()
cm.__enter__ = MagicMock(side_effect=TaskNotFound("snipe", "image_assessment"))
cm.__exit__ = MagicMock(return_value=False)
client_instance = MagicMock()
client_instance.task_allocate.return_value = cm
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \
patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
mock_local.assert_called_once()
conn = sqlite3.connect(tmp_db)
task_row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert task_row[0] == "completed"
def test_run_task_photo_analysis_non_json_response_writes_raw(tmp_db: Path):
"""Non-JSON LLM response is stored with parse_error flag rather than crashing."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
with patch("app.tasks.runner.requests") as mock_req, \
patch("app.tasks.runner._assess_via_local_llm", return_value="not valid json at all"):
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db)
score_row = conn.execute(
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
).fetchone()
conn.close()
parsed = json.loads(score_row[0])
assert parsed.get("parse_error") is True
assert "raw_response" in parsed