"""Tests for survey endpoints: vision health, async analyze task queue, save response, history.""" import json import sqlite3 import pytest from unittest.mock import patch, MagicMock from fastapi.testclient import TestClient from scripts.db_migrate import migrate_db @pytest.fixture def fresh_db(tmp_path, monkeypatch): """Isolated DB + dev_api wired to it via _request_db and DB_PATH.""" db = tmp_path / "test.db" migrate_db(db) monkeypatch.setenv("STAGING_DB", str(db)) import dev_api monkeypatch.setattr(dev_api, "DB_PATH", str(db)) monkeypatch.setattr( dev_api, "_request_db", type("CV", (), {"get": lambda self: str(db), "set": lambda *a: None})(), ) return db @pytest.fixture def client(fresh_db): import dev_api return TestClient(dev_api.app) # ── GET /api/vision/health ──────────────────────────────────────────────────── def test_vision_health_available(client): """Returns available=true when vision service responds 200.""" mock_resp = MagicMock() mock_resp.status_code = 200 with patch("dev_api.requests.get", return_value=mock_resp): resp = client.get("/api/vision/health") assert resp.status_code == 200 assert resp.json() == {"available": True} def test_vision_health_unavailable(client): """Returns available=false when vision service times out or errors.""" with patch("dev_api.requests.get", side_effect=Exception("timeout")): resp = client.get("/api/vision/health") assert resp.status_code == 200 assert resp.json() == {"available": False} # ── POST /api/jobs/{id}/survey/analyze ─────────────────────────────────────── def test_analyze_queues_task_and_returns_task_id(client): """POST analyze queues a background task and returns task_id + is_new.""" with patch("scripts.task_runner.submit_task", return_value=(42, True)) as mock_submit: resp = client.post("/api/jobs/1/survey/analyze", json={ "text": "Q1: Do you prefer teamwork?\nA. Solo B. Together", "mode": "quick", }) assert resp.status_code == 200 data = resp.json() assert data["task_id"] == 42 assert data["is_new"] is True # submit_task called with survey_analyze type call_kwargs = mock_submit.call_args assert call_kwargs.kwargs["task_type"] == "survey_analyze" assert call_kwargs.kwargs["job_id"] == 1 params = json.loads(call_kwargs.kwargs["params"]) assert params["mode"] == "quick" assert params["text"] == "Q1: Do you prefer teamwork?\nA. Solo B. Together" def test_analyze_silently_attaches_to_existing_task(client): """is_new=False when task already running for same input.""" with patch("scripts.task_runner.submit_task", return_value=(7, False)): resp = client.post("/api/jobs/1/survey/analyze", json={ "text": "Q1: test", "mode": "quick", }) assert resp.status_code == 200 assert resp.json()["is_new"] is False def test_analyze_invalid_mode_returns_400(client): resp = client.post("/api/jobs/1/survey/analyze", json={"text": "Q1: test", "mode": "wrong"}) assert resp.status_code == 400 def test_analyze_image_mode_passes_image_in_params(client): """Image payload is forwarded in task params.""" with patch("scripts.task_runner.submit_task", return_value=(1, True)) as mock_submit: resp = client.post("/api/jobs/1/survey/analyze", json={ "image_b64": "aGVsbG8=", "mode": "quick", }) assert resp.status_code == 200 params = json.loads(mock_submit.call_args.kwargs["params"]) assert params["image_b64"] == "aGVsbG8=" assert params["text"] is None # ── GET /api/jobs/{id}/survey/analyze/task ──────────────────────────────────── def test_task_poll_completed_text(client, fresh_db): """Completed task with text result returns parsed source + output.""" result_json = json.dumps({"output": "1. B — best option", "source": "text_paste"}) con = sqlite3.connect(fresh_db) con.execute( "INSERT INTO background_tasks (task_type, job_id, status, error) VALUES (?,?,?,?)", ("survey_analyze", 1, "completed", result_json), ) task_id = con.execute("SELECT last_insert_rowid()").fetchone()[0] con.commit(); con.close() resp = client.get(f"/api/jobs/1/survey/analyze/task?task_id={task_id}") assert resp.status_code == 200 data = resp.json() assert data["status"] == "completed" assert data["result"]["source"] == "text_paste" assert "B" in data["result"]["output"] assert data["message"] is None def test_task_poll_completed_screenshot(client, fresh_db): """Completed task with image result returns source=screenshot.""" result_json = json.dumps({"output": "1. C — collaborative", "source": "screenshot"}) con = sqlite3.connect(fresh_db) con.execute( "INSERT INTO background_tasks (task_type, job_id, status, error) VALUES (?,?,?,?)", ("survey_analyze", 1, "completed", result_json), ) task_id = con.execute("SELECT last_insert_rowid()").fetchone()[0] con.commit(); con.close() resp = client.get(f"/api/jobs/1/survey/analyze/task?task_id={task_id}") assert resp.status_code == 200 assert resp.json()["result"]["source"] == "screenshot" def test_task_poll_failed_returns_message(client, fresh_db): """Failed task returns status=failed with error message.""" con = sqlite3.connect(fresh_db) con.execute( "INSERT INTO background_tasks (task_type, job_id, status, error) VALUES (?,?,?,?)", ("survey_analyze", 1, "failed", "LLM unavailable"), ) task_id = con.execute("SELECT last_insert_rowid()").fetchone()[0] con.commit(); con.close() resp = client.get(f"/api/jobs/1/survey/analyze/task?task_id={task_id}") assert resp.status_code == 200 data = resp.json() assert data["status"] == "failed" assert data["message"] == "LLM unavailable" assert data["result"] is None def test_task_poll_running_returns_stage(client, fresh_db): """Running task returns status=running with current stage.""" con = sqlite3.connect(fresh_db) con.execute( "INSERT INTO background_tasks (task_type, job_id, status, stage) VALUES (?,?,?,?)", ("survey_analyze", 1, "running", "analyzing survey"), ) task_id = con.execute("SELECT last_insert_rowid()").fetchone()[0] con.commit(); con.close() resp = client.get(f"/api/jobs/1/survey/analyze/task?task_id={task_id}") assert resp.status_code == 200 data = resp.json() assert data["status"] == "running" assert data["stage"] == "analyzing survey" def test_task_poll_none_when_no_task(client): """Returns status=none when no task exists for the job.""" resp = client.get("/api/jobs/999/survey/analyze/task") assert resp.status_code == 200 assert resp.json()["status"] == "none" # ── POST /api/jobs/{id}/survey/responses ───────────────────────────────────── def test_save_response_text(client): """Save a text-mode survey response returns an id.""" resp = client.post("/api/jobs/1/survey/responses", json={ "survey_name": "Culture Fit", "mode": "quick", "source": "text_paste", "raw_input": "Q1: Teamwork?", "llm_output": "1. B is best", "reported_score": "85", }) assert resp.status_code == 200 assert "id" in resp.json() def test_save_response_with_image(client): """Save a screenshot-mode survey response returns an id.""" resp = client.post("/api/jobs/1/survey/responses", json={ "survey_name": None, "mode": "quick", "source": "screenshot", "image_b64": "aGVsbG8=", "llm_output": "1. C collaborative", "reported_score": None, }) assert resp.status_code == 200 assert "id" in resp.json() def test_get_history_empty(client): """History is empty for a fresh job.""" resp = client.get("/api/jobs/1/survey/responses") assert resp.status_code == 200 assert resp.json() == [] def test_get_history_populated(client): """History returns all saved responses for a job in reverse order.""" for i in range(2): client.post("/api/jobs/1/survey/responses", json={ "survey_name": f"Survey {i}", "mode": "quick", "source": "text_paste", "llm_output": f"Output {i}", }) resp = client.get("/api/jobs/1/survey/responses") assert resp.status_code == 200 assert len(resp.json()) == 2