snipe/tests/test_tasks/test_runner.py
pyr0ball fb81422c54 feat: snipe beta backlog batch (tickets #22/#28/#30/#34/#35/#36/#37/#38)
Cloud/session:
- fix(_extract_session_token): return "" for non-JWT cookie strings (snipe_guest=uuid was
  triggering 401 → forced login redirect for all unauthenticated cloud visitors)
- fix(affiliate): exclude guest: and anonymous users from pref-store writes (#38)
- fix(market-comp): use enriched comp_query for market comp hash so write/read keys match (#30)

Frontend:
- feat(SearchView): unauthenticated landing strip with free-account CTA (#36)
- feat(SearchView): aria-pressed on filter toggles, aria-label on icon buttons, focus-visible
  rings on all interactive controls, live region for result count (#35)
- feat(SearchView): no-results empty-state hint text (#36)
- feat(SEO): og:image 1200x630, summary_large_image twitter card, canonical link (#37)
- feat(OG): generated og-image.png (dark tactical theme, feature pills) (#37)
- feat(settings): TrustSignalPref view wired to /settings route (#28)
- fix(router): /settings route added; unauthenticated access redirects to home (#34)

CI/CD:
- feat(ci): Forgejo Actions workflow (ruff + pytest + vue-tsc + vitest) (#22)
- feat(ci): mirror workflow (GitHub + Codeberg on push to main/tags) (#22)
- feat(ci): release workflow (Docker build+push + git-cliff changelog) (#22)
- chore: git-cliff config (.cliff.toml) for conventional commit changelog (#22)
- chore(pyproject): dev extras (pytest/ruff/httpx), ruff config with ignore list (#22)

Lint:
- fix: remove 11 unused imports across api/, app/, tests/ (ruff F401 clean)
2026-04-13 19:32:50 -07:00

158 lines
5.2 KiB
Python

"""Tests for snipe background task runner."""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from unittest.mock import patch
import pytest
from app.tasks.runner import (
LLM_TASK_TYPES,
VRAM_BUDGETS,
insert_task,
run_task,
)
@pytest.fixture
def tmp_db(tmp_path: Path) -> Path:
db = tmp_path / "snipe.db"
conn = sqlite3.connect(db)
conn.executescript("""
CREATE TABLE background_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_type TEXT NOT NULL,
job_id INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'queued',
params TEXT,
error TEXT,
stage TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE trust_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
listing_id INTEGER NOT NULL,
composite_score INTEGER NOT NULL DEFAULT 0,
photo_analysis_json TEXT,
red_flags_json TEXT NOT NULL DEFAULT '[]',
scored_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO trust_scores (listing_id, composite_score) VALUES (1, 72);
""")
conn.commit()
conn.close()
return db
def test_llm_task_types_defined():
assert "trust_photo_analysis" in LLM_TASK_TYPES
def test_vram_budgets_defined():
assert "trust_photo_analysis" in VRAM_BUDGETS
assert VRAM_BUDGETS["trust_photo_analysis"] > 0
def test_insert_task_creates_row(tmp_db: Path):
task_id, is_new = insert_task(tmp_db, "trust_photo_analysis", job_id=1)
assert is_new is True
conn = sqlite3.connect(tmp_db)
row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert row[0] == "queued"
def test_insert_task_dedup(tmp_db: Path):
id1, new1 = insert_task(tmp_db, "trust_photo_analysis", job_id=1)
id2, new2 = insert_task(tmp_db, "trust_photo_analysis", job_id=1)
assert id1 == id2
assert new1 is True
assert new2 is False
def test_run_task_photo_analysis_success(tmp_db: Path):
"""Vision analysis result is written to trust_scores.photo_analysis_json."""
params = json.dumps({
"listing_id": 1,
"photo_url": "https://example.com/photo.jpg",
"listing_title": "Used iPhone 13",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
vision_result = {
"is_stock_photo": False,
"visible_damage": False,
"authenticity_signal": "genuine_product_photo",
"confidence": "high",
}
with patch("app.tasks.runner.requests") as mock_req, \
patch("app.tasks.runner.LLMRouter") as MockRouter:
mock_req.get.return_value.content = b"fake_image_bytes"
mock_req.get.return_value.raise_for_status = lambda: None
instance = MockRouter.return_value
instance.complete.return_value = json.dumps(vision_result)
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db)
score_row = conn.execute(
"SELECT photo_analysis_json FROM trust_scores WHERE listing_id=1"
).fetchone()
task_row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert task_row[0] == "completed"
parsed = json.loads(score_row[0])
assert parsed["is_stock_photo"] is False
def test_run_task_photo_fetch_failure_marks_failed(tmp_db: Path):
"""If photo download fails, task is marked failed without crashing."""
params = json.dumps({
"listing_id": 1,
"photo_url": "https://example.com/bad.jpg",
"listing_title": "Laptop",
})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
with patch("app.tasks.runner.requests") as mock_req:
mock_req.get.side_effect = ConnectionError("fetch failed")
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db)
row = conn.execute(
"SELECT status, error FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert row[0] == "failed"
assert "fetch failed" in row[1]
def test_run_task_no_photo_url_marks_failed(tmp_db: Path):
params = json.dumps({"listing_id": 1})
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1, params=params)
run_task(tmp_db, task_id, "trust_photo_analysis", 1, params)
conn = sqlite3.connect(tmp_db)
row = conn.execute(
"SELECT status, error FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert row[0] == "failed"
assert "photo_url" in row[1]
def test_run_task_unknown_type_marks_failed(tmp_db: Path):
task_id, _ = insert_task(tmp_db, "trust_photo_analysis", job_id=1)
run_task(tmp_db, task_id, "unknown_type", 1, None)
conn = sqlite3.connect(tmp_db)
row = conn.execute(
"SELECT status FROM background_tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
assert row[0] == "failed"