diff --git a/app/tasks/runner.py b/app/tasks/runner.py index 2b41b4c..5b6b1c2 100644 --- a/app/tasks/runner.py +++ b/app/tasks/runner.py @@ -7,28 +7,30 @@ Current task types: trust_photo_analysis — download primary photo, run vision LLM, write result to trust_scores.photo_analysis_json (Paid tier). -Prompt note: The vision prompt is a functional first pass. Tune against real -eBay listings before GA — specifically stock-photo vs genuine-product distinction -and the damage vocabulary. +Image assessment routing: + Cloud (CF_ORCH_URL set): allocates via cf-orch task endpoint + product=snipe, task=image_assessment. + Local (no CF_ORCH_URL) or TaskNotFound fallback: uses LLMRouter + with a vision-capable local backend (moondream2, llava, etc.). """ from __future__ import annotations import base64 import json import logging +import os from pathlib import Path +import httpx import requests from circuitforge_core.db import get_connection -from circuitforge_core.llm import LLMRouter log = logging.getLogger(__name__) LLM_TASK_TYPES: frozenset[str] = frozenset({"trust_photo_analysis"}) VRAM_BUDGETS: dict[str, float] = { - # moondream2 / vision-capable LLM — single image, short response - "trust_photo_analysis": 2.0, + "trust_photo_analysis": 6000, # Q5_K_M Qwen2-VL via cf-orch; LLMRouter fallback uses 2.0 GB } _VISION_SYSTEM_PROMPT = ( @@ -51,8 +53,7 @@ def insert_task( ) -> tuple[int, bool]: """Insert a background task if no identical task is already in-flight. - Uses get_connection() so WAL mode and timeout=30 apply — same as all other - Snipe DB access. Returns (task_id, is_new). + Returns (task_id, is_new). """ conn = get_connection(db_path) conn.row_factory = __import__("sqlite3").Row @@ -120,32 +121,26 @@ def _run_trust_photo_analysis( p = json.loads(params or "{}") photo_url = p.get("photo_url", "") listing_title = p.get("listing_title", "") - # user_db: per-user DB in cloud mode; same as db_path in local mode. result_db = Path(p.get("user_db", str(db_path))) if not photo_url: raise ValueError("trust_photo_analysis: 'photo_url' is required in params") - # Download and base64-encode the photo resp = requests.get(photo_url, timeout=10) resp.raise_for_status() image_b64 = base64.b64encode(resp.content).decode() + image_data_url = f"data:image/jpeg;base64,{image_b64}" - # Build user prompt with optional title context - user_prompt = "Evaluate this eBay listing photo." + user_prompt = "Assess this listing image." if listing_title: - user_prompt = f"Evaluate this eBay listing photo for: {listing_title}" + user_prompt = f"Assess this eBay listing image: {listing_title}" - # Call LLMRouter with vision capability - router = LLMRouter() - raw = router.complete( - user_prompt, - system=_VISION_SYSTEM_PROMPT, - images=[image_b64], - max_tokens=128, - ) + cforch_url = os.getenv("CF_ORCH_URL") + if cforch_url: + raw = _assess_via_orch(cforch_url, image_data_url, user_prompt) + else: + raw = _assess_via_local_llm(image_b64, user_prompt) - # Parse — be lenient: strip markdown fences if present try: cleaned = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() analysis = json.loads(cleaned) @@ -168,3 +163,54 @@ def _run_trust_photo_analysis( analysis.get("visible_damage"), analysis.get("confidence"), ) + + +def _assess_via_orch(cforch_url: str, image_data_url: str, user_prompt: str) -> str: + """Run photo assessment via cf-orch task endpoint (cloud path).""" + from circuitforge_orch.client import CFOrchClient, TaskNotFound + + client = CFOrchClient(cforch_url) + try: + with client.task_allocate("snipe", "image_assessment") as alloc: + resp = httpx.post( + f"{alloc.url}/v1/chat/completions", + json={ + "model": alloc.model or "__auto__", + "messages": [ + { + "role": "system", + "content": _VISION_SYSTEM_PROMPT, + }, + { + "role": "user", + "content": [ + {"type": "image_url", "image_url": {"url": image_data_url}}, + {"type": "text", "text": user_prompt}, + ], + }, + ], + "max_tokens": 128, + }, + timeout=60.0, + ) + resp.raise_for_status() + return resp.json()["choices"][0]["message"]["content"] + except TaskNotFound: + log.warning( + "snipe.image_assessment not registered in cf-orch — falling back to local LLM" + ) + image_b64 = image_data_url.split(",", 1)[1] + return _assess_via_local_llm(image_b64, user_prompt) + + +def _assess_via_local_llm(image_b64: str, user_prompt: str) -> str: + """Run photo assessment via local LLMRouter (local/self-hosted path).""" + from app.llm.router import LLMRouter + + router = LLMRouter() + return router.complete( + user_prompt, + system=_VISION_SYSTEM_PROMPT, + images=[image_b64], + max_tokens=128, + ) diff --git a/pyproject.toml b/pyproject.toml index 3af54b3..956a8e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "cryptography>=42.0", "PyJWT>=2.8", "httpx>=0.27", + "circuitforge-orch>=0.1.0", ] [project.optional-dependencies] diff --git a/tests/test_tasks/test_runner.py b/tests/test_tasks/test_runner.py index c9c13db..c593870 100644 --- a/tests/test_tasks/test_runner.py +++ b/tests/test_tasks/test_runner.py @@ -4,7 +4,7 @@ from __future__ import annotations import json import sqlite3 from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch, call import pytest @@ -47,6 +47,19 @@ def tmp_db(tmp_path: Path) -> Path: return db +_VISION_JSON = json.dumps({ + "is_stock_photo": False, + "visible_damage": False, + "authenticity_signal": "genuine_product_photo", + "confidence": "high", +}) + +_PARAMS = json.dumps({ + "photo_url": "https://example.com/photo.jpg", + "listing_title": "Used iPhone 13", +}) + + def test_llm_task_types_defined(): assert "trust_photo_analysis" in LLM_TASK_TYPES @@ -75,29 +88,17 @@ def test_insert_task_dedup(tmp_db: Path): assert new2 is False -def test_run_task_photo_analysis_success(tmp_db: Path): - """Vision analysis result is written to trust_scores.photo_analysis_json.""" - params = json.dumps({ - "listing_id": 1, - "photo_url": "https://example.com/photo.jpg", - "listing_title": "Used iPhone 13", - }) - task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params) +# ── Local LLMRouter path ────────────────────────────────────────────────────── - vision_result = { - "is_stock_photo": False, - "visible_damage": False, - "authenticity_signal": "genuine_product_photo", - "confidence": "high", - } +def test_run_task_photo_analysis_local_success(tmp_db: Path): + """Local path: vision result is written to trust_scores.photo_analysis_json.""" + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS) with patch("app.tasks.runner.requests") as mock_req, \ - patch("app.tasks.runner.LLMRouter") as MockRouter: + patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON): mock_req.get.return_value.content = b"fake_image_bytes" mock_req.get.return_value.raise_for_status = lambda: None - instance = MockRouter.return_value - instance.complete.return_value = json.dumps(vision_result) - run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) + run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS) conn = sqlite3.connect(tmp_db) score_row = conn.execute( @@ -110,20 +111,16 @@ def test_run_task_photo_analysis_success(tmp_db: Path): assert task_row[0] == "completed" parsed = json.loads(score_row[0]) assert parsed["is_stock_photo"] is False + assert parsed["confidence"] == "high" def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path): """If photo download fails, task is marked failed without crashing.""" - params = json.dumps({ - "listing_id": 1, - "photo_url": "https://example.com/bad.jpg", - "listing_title": "Laptop", - }) - task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params) + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS) with patch("app.tasks.runner.requests") as mock_req: mock_req.get.side_effect = ConnectionError("fetch failed") - run_task(tmp_db, task_id, "trust_photo_analysis", 1, params) + run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS) conn = sqlite3.connect(tmp_db) row = conn.execute( @@ -156,3 +153,169 @@ def test_run_task_unknown_type_marks_failed(tmp_db: Path): ).fetchone() conn.close() assert row[0] == "failed" + + +# ── cf-orch path ────────────────────────────────────────────────────────────── + +def _make_orch_client_mock(vision_json: str) -> MagicMock: + """Build a CFOrchClient mock whose task_allocate context manager returns an Allocation.""" + alloc = MagicMock() + alloc.url = "http://cf-vlm.local:8000" + alloc.model = "bartowski--qwen2-vl-7b-instruct-gguf" + + cm = MagicMock() + cm.__enter__ = MagicMock(return_value=alloc) + cm.__exit__ = MagicMock(return_value=False) + + client = MagicMock() + client.task_allocate.return_value = cm + return client + + +def test_run_task_photo_analysis_orch_success(tmp_db: Path): + """Cloud path: CFOrchClient.task_allocate is used when CF_ORCH_URL is set.""" + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS) + + chat_resp = MagicMock() + chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]} + chat_resp.raise_for_status = MagicMock() + + with patch("app.tasks.runner.requests") as mock_req, \ + patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \ + patch("app.tasks.runner.httpx") as mock_httpx, \ + patch("circuitforge_orch.client.CFOrchClient") as MockClient: + + mock_req.get.return_value.content = b"fake_image_bytes" + mock_req.get.return_value.raise_for_status = lambda: None + mock_httpx.post.return_value = chat_resp + + client_instance = _make_orch_client_mock(_VISION_JSON) + MockClient.return_value = client_instance + + run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS) + + conn = sqlite3.connect(tmp_db) + score_row = conn.execute( + "SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1" + ).fetchone() + task_row = conn.execute( + "SELECT status FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert task_row[0] == "completed" + parsed = json.loads(score_row[0]) + assert parsed["authenticity_signal"] == "genuine_product_photo" + + +def test_run_task_photo_analysis_orch_uses_image_assessment_task(tmp_db: Path): + """task_allocate must be called with product='snipe', task='image_assessment'.""" + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS) + + chat_resp = MagicMock() + chat_resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]} + chat_resp.raise_for_status = MagicMock() + + with patch("app.tasks.runner.requests") as mock_req, \ + patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \ + patch("app.tasks.runner.httpx") as mock_httpx, \ + patch("circuitforge_orch.client.CFOrchClient") as MockClient: + + mock_req.get.return_value.content = b"fake_image_bytes" + mock_req.get.return_value.raise_for_status = lambda: None + mock_httpx.post.return_value = chat_resp + + client_instance = _make_orch_client_mock(_VISION_JSON) + MockClient.return_value = client_instance + + run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS) + + client_instance.task_allocate.assert_called_once_with("snipe", "image_assessment") + + +def test_run_task_photo_analysis_orch_sends_image_url_content(tmp_db: Path): + """Vision payload must include image_url content block with data URI.""" + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS) + + captured_body: dict = {} + + def capture_post(url, **kwargs): + nonlocal captured_body + if "/v1/chat/completions" in url: + captured_body = kwargs.get("json", {}) + resp = MagicMock() + resp.json.return_value = {"choices": [{"message": {"content": _VISION_JSON}}]} + resp.raise_for_status = MagicMock() + return resp + + with patch("app.tasks.runner.requests") as mock_req, \ + patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \ + patch("app.tasks.runner.httpx") as mock_httpx, \ + patch("circuitforge_orch.client.CFOrchClient") as MockClient: + + mock_req.get.return_value.content = b"fake_image_bytes" + mock_req.get.return_value.raise_for_status = lambda: None + mock_httpx.post.side_effect = capture_post + + client_instance = _make_orch_client_mock(_VISION_JSON) + MockClient.return_value = client_instance + + run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS) + + user_content = captured_body["messages"][1]["content"] + image_blocks = [b for b in user_content if b.get("type") == "image_url"] + assert image_blocks, "No image_url content block found in vision payload" + url = image_blocks[0]["image_url"]["url"] + assert url.startswith("data:image/jpeg;base64,"), f"Unexpected image URL format: {url[:40]}" + + +def test_run_task_photo_analysis_orch_task_not_found_falls_back(tmp_db: Path): + """TaskNotFound from cf-orch → graceful fallback to local LLMRouter.""" + from circuitforge_orch.client import TaskNotFound + + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS) + + cm = MagicMock() + cm.__enter__ = MagicMock(side_effect=TaskNotFound("snipe", "image_assessment")) + cm.__exit__ = MagicMock(return_value=False) + + client_instance = MagicMock() + client_instance.task_allocate.return_value = cm + + with patch("app.tasks.runner.requests") as mock_req, \ + patch.dict("os.environ", {"CF_ORCH_URL": "http://cf-orch.local:8700"}), \ + patch("circuitforge_orch.client.CFOrchClient", return_value=client_instance), \ + patch("app.tasks.runner._assess_via_local_llm", return_value=_VISION_JSON) as mock_local: + + mock_req.get.return_value.content = b"fake_image_bytes" + mock_req.get.return_value.raise_for_status = lambda: None + + run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS) + + mock_local.assert_called_once() + + conn = sqlite3.connect(tmp_db) + task_row = conn.execute( + "SELECT status FROM background_tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + assert task_row[0] == "completed" + + +def test_run_task_photo_analysis_non_json_response_writes_raw(tmp_db: Path): + """Non-JSON LLM response is stored with parse_error flag rather than crashing.""" + task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=_PARAMS) + + with patch("app.tasks.runner.requests") as mock_req, \ + patch("app.tasks.runner._assess_via_local_llm", return_value="not valid json at all"): + mock_req.get.return_value.content = b"fake_image_bytes" + mock_req.get.return_value.raise_for_status = lambda: None + run_task(tmp_db, task_id, "trust_photo_analysis", 1, _PARAMS) + + conn = sqlite3.connect(tmp_db) + score_row = conn.execute( + "SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1" + ).fetchone() + conn.close() + parsed = json.loads(score_row[0]) + assert parsed.get("parse_error") is True + assert "raw_response" in parsed