feat(tasks): migrate trust_photo_analysis to cf-orch image_assessment task endpoint (#43)

- _assess_via_orch(): uses CFOrchClient.task_allocate('snipe', 'image_assessment')
  with multimodal image_url payload; falls back to local LLMRouter on TaskNotFound
- _assess_via_local_llm(): lazy LLMRouter import, unchanged local path
- CF_ORCH_URL env var selects path at runtime; local users unaffected
- Added circuitforge-orch>=0.1.0 to main dependencies
- 5 new runner tests covering orch happy path, task tag, image_url payload format,
  TaskNotFound fallback, and non-JSON response handling (13 tests total, 244 suite)
This commit is contained in:
pyr0ball 2026-05-13 15:43:18 -07:00
parent 1bf95bba2a
commit 263c8522ee
3 changed files with 258 additions and 48 deletions

View file

@ -7,28 +7,30 @@ Current task types:
trust_photo_analysis download primary photo, run vision LLM, write trust_photo_analysis download primary photo, run vision LLM, write
result to trust_scores.photo_analysis_json (Paid tier). result to trust_scores.photo_analysis_json (Paid tier).
Prompt note: The vision prompt is a functional first pass. Tune against real Image assessment routing:
eBay listings before GA specifically stock-photo vs genuine-product distinction Cloud (CF_ORCH_URL set): allocates via cf-orch task endpoint
and the damage vocabulary. product=snipe, task=image_assessment.
Local (no CF_ORCH_URL) or TaskNotFound fallback: uses LLMRouter
with a vision-capable local backend (moondream2, llava, etc.).
""" """
from __future__ import annotations from __future__ import annotations
import base64 import base64
import json import json
import logging import logging
import os
from pathlib import Path from pathlib import Path
import httpx
import requests import requests
from circuitforge_core.db import get_connection from circuitforge_core.db import get_connection
from circuitforge_core.llm import LLMRouter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"}) LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"})
VRAM_BUDGETS: dict[str, float] = { VRAM_BUDGETS: dict[str, float] = {
# moondream2 / vision-capable LLM — single image, short response "trust_photo_analysis": 6000, # Q5_K_M Qwen2-VL via cf-orch; LLMRouter fallback uses 2.0 GB
"trust_photo_analysis": 2.0,
} }
_VISION_SYSTEM_PROMPT = ( _VISION_SYSTEM_PROMPT = (
@ -51,8 +53,7 @@ def insert_task(
) -> tuple[int, bool]: ) -> tuple[int, bool]:
"""Insert a background task if no identical task is already in-flight. """Insert a background task if no identical task is already in-flight.
Uses get_connection() so WAL mode and timeout=30 apply same as all other Returns (task_id, is_new).
Snipe DB access. Returns (task_id, is_new).
""" """
conn = get_connection(db_path) conn = get_connection(db_path)
conn.row_factory = __import__("sqlite3").Row conn.row_factory = __import__("sqlite3").Row
@ -120,32 +121,26 @@ def _run_trust_photo_analysis(
p = json.loads(params or "{}") p = json.loads(params or "{}")
photo_url = p.get("photo_url", "") photo_url = p.get("photo_url", "")
listing_title = p.get("listing_title", "") listing_title = p.get("listing_title", "")
# user_db: per-user DB in cloud mode; same as db_path in local mode.
result_db = Path(p.get("user_db", str(db_path))) result_db = Path(p.get("user_db", str(db_path)))
if not photo_url: if not photo_url:
raise ValueError("trust_photo_analysis: 'photo_url' is required in params") raise ValueError("trust_photo_analysis: 'photo_url' is required in params")
# Download and base64-encode the photo
resp = requests.get(photo_url, timeout=10) resp = requests.get(photo_url, timeout=10)
resp.raise_for_status() resp.raise_for_status()
image_b64 = base64.b64encode(resp.content).decode() image_b64 = base64.b64encode(resp.content).decode()
image_data_url = f"data:image/jpeg;base64,{image_b64}"
# Build user prompt with optional title context user_prompt = "Assess this listing image."
user_prompt = "Evaluate this eBay listing photo."
if listing_title: if listing_title:
user_prompt = f"Evaluate this eBay listing photo for: {listing_title}" user_prompt = f"Assess this eBay listing image: {listing_title}"
# Call LLMRouter with vision capability cforch_url = os.getenv("CF_ORCH_URL")
router = LLMRouter() if cforch_url:
raw = router.complete( raw = _assess_via_orch(cforch_url, image_data_url, user_prompt)
user_prompt, else:
system=_VISION_SYSTEM_PROMPT, raw = _assess_via_local_llm(image_b64, user_prompt)
images=[image_b64],
max_tokens=128,
)
# Parse — be lenient: strip markdown fences if present
try: try:
cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
analysis = json.loads(cleaned) analysis = json.loads(cleaned)
@ -168,3 +163,54 @@ def _run_trust_photo_analysis(
analysis.get("visible_damage"), analysis.get("visible_damage"),
analysis.get("confidence"), analysis.get("confidence"),
) )
def _assess_via_orch(cforch_url: str, image_data_url: str, user_prompt: str) -> str:
"""Run photo assessment via cf-orch task endpoint (cloud path)."""
from circuitforge_orch.client import CFOrchClient, TaskNotFound
client = CFOrchClient(cforch_url)
try:
with client.task_allocate("snipe", "image_assessment") as alloc:
resp = httpx.post(
f"{alloc.url}/v1/chat/completions",
json={
"model": alloc.model or "__auto__",
"messages": [
{
"role": "system",
"content": _VISION_SYSTEM_PROMPT,
},
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_data_url}},
{"type": "text", "text": user_prompt},
],
},
],
"max_tokens": 128,
},
timeout=60.0,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except TaskNotFound:
log.warning(
"snipe.image_assessment not registered in cf-orch — falling back to local LLM"
)
image_b64 = image_data_url.split(",", 1)[1]
return _assess_via_local_llm(image_b64, user_prompt)
def _assess_via_local_llm(image_b64: str, user_prompt: str) -> str:
"""Run photo assessment via local LLMRouter (local/self-hosted path)."""
from app.llm.router import LLMRouter
router = LLMRouter()
return router.complete(
user_prompt,
system=_VISION_SYSTEM_PROMPT,
images=[image_b64],
max_tokens=128,
)

View file

@ -24,6 +24,7 @@ dependencies = [
"cryptography>=42.0", "cryptography>=42.0",
"PyJWT>=2.8", "PyJWT>=2.8",
"httpx>=0.27", "httpx>=0.27",
"circuitforge-orch>=0.1.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View file

@ -4,7 +4,7 @@ from __future__ import annotations
import json import json
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import MagicMock, patch, call
import pytest import pytest
@ -47,6 +47,19 @@ def tmp_db(tmp_path: Path) -> Path:
return db return db
_VISION_JSON = json.dumps({
"is_stock_photo": False,
"visible_damage": False,
"authenticity_signal": "genuine_product_photo",
"confidence": "high",
})
_PARAMS = json.dumps({
"photo_url": "https://example.com/photo.jpg",
"listing_title": "Used iPhone 13",
})
def test_llm_task_types_defined(): def test_llm_task_types_defined():
assert "trust_photo_analysis" in LLM_TASK_TYPES assert "trust_photo_analysis" in LLM_TASK_TYPES
@ -75,29 +88,17 @@ def test_insert_task_dedup(tmp_db: Path):
assert new2 is False assert new2 is False
def test_run_task_photo_analysis_success(tmp_db: Path): # ── Local LLMRouter path ──────────────────────────────────────────────────────
"""Vision analysis result is written to trust_scores.photo_analysis_json."""
params = json.dumps({
"listing_id": 1,
"photo_url": "https://example.com/photo.jpg",
"listing_title": "Used iPhone 13",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
vision_result = { def test_run_task_photo_analysis_local_success(tmp_db: Path):
"is_stock_photo": False, """Local path: vision result is written to trust_scores.photo_analysis_json."""
"visible_damage": False, task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
"authenticity_signal": "genuine_product_photo",
"confidence": "high",
}
with patch("app.tasks.runner.requests") as mock_req, \ with patch("app.tasks.runner.requests") as mock_req, \
patch("app.tasks.runner.LLMRouter") as MockRouter: patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON):
mock_req.get.return_value.content = b"fake_image_bytes" mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None mock_req.get.return_value.raise_for_status = lambda: None
instance = MockRouter.return_value run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
instance.complete.return_value = json.dumps(vision_result)
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db) conn = sqlite3.connect(tmp_db)
score_row = conn.execute( score_row = conn.execute(
@ -110,20 +111,16 @@ def test_run_task_photo_analysis_success(tmp_db: Path):
assert task_row[0] == "completed" assert task_row[0] == "completed"
parsed = json.loads(score_row[0]) parsed = json.loads(score_row[0])
assert parsed["is_stock_photo"] is False assert parsed["is_stock_photo"] is False
assert parsed["confidence"] == "high"
def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path): def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path):
"""If photo download fails, task is marked failed without crashing.""" """If photo download fails, task is marked failed without crashing."""
params = json.dumps({ task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
"listing_id": 1,
"photo_url": "https://example.com/bad.jpg",
"listing_title": "Laptop",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
with patch("app.tasks.runner.requests") as mock_req: with patch("app.tasks.runner.requests") as mock_req:
mock_req.get.side_effect = ConnectionError("fetch failed") mock_req.get.side_effect = ConnectionError("fetch failed")
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db) conn = sqlite3.connect(tmp_db)
row = conn.execute( row = conn.execute(
@ -156,3 +153,169 @@ def test_run_task_unknown_type_marks_failed(tmp_db: Path):
).fetchone() ).fetchone()
conn.close() conn.close()
assert row[0] == "failed" assert row[0] == "failed"
# ── cf-orch path ──────────────────────────────────────────────────────────────
def _make_orch_client_mock(vision_json: str) -> MagicMock:
"""Build a CFOrchClient mock whose task_allocate context manager returns an Allocation."""
alloc = MagicMock()
alloc.url = "http://cf-vlm.local:8000"
alloc.model = "bartowski--qwen2-vl-7b-instruct-gguf"
cm = MagicMock()
cm.__enter__ = MagicMock(return_value=alloc)
cm.__exit__ = MagicMock(return_value=False)
client = MagicMock()
client.task_allocate.return_value = cm
return client
def test_run_task_photo_analysis_orch_success(tmp_db: Path):
"""Cloud path: CFOrchClient.task_allocate is used when CF_ORCH_URL is set."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
chat_resp = MagicMock()
chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.return_value = chat_resp
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db)
score_row = conn.execute(
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
).fetchone()
task_row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert task_row[0] == "completed"
parsed = json.loads(score_row[0])
assert parsed["authenticity_signal"] == "genuine_product_photo"
def test_run_task_photo_analysis_orch_uses_image_assessment_task(tmp_db: Path):
"""task_allocate must be called with product='snipe', task='image_assessment'."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
chat_resp = MagicMock()
chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
chat_resp.raise_for_status = MagicMock()
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.return_value = chat_resp
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
client_instance.task_allocate.assert_called_once_with("snipe", "image_assessment")
def test_run_task_photo_analysis_orch_sends_image_url_content(tmp_db: Path):
"""Vision payload must include image_url content block with data URI."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
captured_body: dict = {}
def capture_post(url, **kwargs):
nonlocal captured_body
if "/v1/chat/completions" in url:
captured_body = kwargs.get("json", {})
resp = MagicMock()
resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]}
resp.raise_for_status = MagicMock()
return resp
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("app.tasks.runner.httpx") as mock_httpx, \
patch("circuitforge_orch.client.CFOrchClient") as MockClient:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
mock_httpx.post.side_effect = capture_post
client_instance = _make_orch_client_mock(_VISION_JSON)
MockClient.return_value = client_instance
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
user_content = captured_body["messages"][1]["content"]
image_blocks = [b for b in user_content if b.get("type") == "image_url"]
assert image_blocks, "No image_url content block found in vision payload"
url = image_blocks[0]["image_url"]["url"]
assert url.startswith("data:image/jpeg;base64,"), f"Unexpected image URL format: {url[:40]}"
def test_run_task_photo_analysis_orch_task_not_found_falls_back(tmp_db: Path):
"""TaskNotFound from cf-orch → graceful fallback to local LLMRouter."""
from circuitforge_orch.client import TaskNotFound
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
cm = MagicMock()
cm.__enter__ = MagicMock(side_effect=TaskNotFound("snipe", "image_assessment"))
cm.__exit__ = MagicMock(return_value=False)
client_instance = MagicMock()
client_instance.task_allocate.return_value = cm
with patch("app.tasks.runner.requests") as mock_req, \
patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \
patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \
patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
mock_local.assert_called_once()
conn = sqlite3.connect(tmp_db)
task_row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert task_row[0] == "completed"
def test_run_task_photo_analysis_non_json_response_writes_raw(tmp_db: Path):
"""Non-JSON LLM response is stored with parse_error flag rather than crashing."""
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS)
with patch("app.tasks.runner.requests") as mock_req, \
patch("app.tasks.runner._assess_via_local_llm", return_value="not valid json at all"):
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS)
conn = sqlite3.connect(tmp_db)
score_row = conn.execute(
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
).fetchone()
conn.close()
parsed = json.loads(score_row[0])
assert parsed.get("parse_error") is True
assert "raw_response" in parsed