import threading import time import pytest from pathlib import Path from unittest.mock import patch import sqlite3 def _make_db(tmp_path): from scripts.db import init_db, insert_job db = tmp_path / "test.db" init_db(db) job_id = insert_job(db, { "title": "CSM", "company": "Acme", "url": "https://ex.com/1", "source": "linkedin", "location": "Remote", "is_remote": True, "salary": "", "description": "Great role.", "date_found": "2026-02-20", }) return db, job_id def test_submit_task_returns_id_and_true(tmp_path): """submit_task returns (task_id, True) and spawns a thread.""" db, job_id = _make_db(tmp_path) with patch("scripts.task_runner._run_task"): # don't actually call LLM from scripts.task_runner import submit_task task_id, is_new = submit_task(db, "cover_letter", job_id) assert isinstance(task_id, int) and task_id > 0 assert is_new is True def test_submit_task_deduplicates(tmp_path): """submit_task returns (existing_id, False) for a duplicate in-flight task.""" db, job_id = _make_db(tmp_path) with patch("scripts.task_runner._run_task"): from scripts.task_runner import submit_task first_id, _ = submit_task(db, "cover_letter", job_id) second_id, is_new = submit_task(db, "cover_letter", job_id) assert second_id == first_id assert is_new is False def test_run_task_cover_letter_success(tmp_path): """_run_task marks running→completed and saves cover letter to DB.""" db, job_id = _make_db(tmp_path) from scripts.db import insert_task, get_task_for_job task_id, _ = insert_task(db, "cover_letter", job_id) with patch("scripts.generate_cover_letter.generate", return_value="Dear Hiring Manager,\nGreat fit!"): from scripts.task_runner import _run_task _run_task(db, task_id, "cover_letter", job_id) task = get_task_for_job(db, "cover_letter", job_id) assert task["status"] == "completed" assert task["error"] is None conn = sqlite3.connect(db) row = conn.execute("SELECT cover_letter FROM jobs WHERE id=?", (job_id,)).fetchone() conn.close() assert row[0] == "Dear Hiring Manager,\nGreat fit!" def test_run_task_company_research_success(tmp_path): """_run_task marks running→completed and saves research to DB.""" db, job_id = _make_db(tmp_path) from scripts.db import insert_task, get_task_for_job, get_research task_id, _ = insert_task(db, "company_research", job_id) fake_result = { "raw_output": "raw", "company_brief": "brief", "ceo_brief": "ceo", "talking_points": "points", } with patch("scripts.company_research.research_company", return_value=fake_result): from scripts.task_runner import _run_task _run_task(db, task_id, "company_research", job_id) task = get_task_for_job(db, "company_research", job_id) assert task["status"] == "completed" research = get_research(db, job_id=job_id) assert research["company_brief"] == "brief" def test_run_task_marks_failed_on_exception(tmp_path): """_run_task marks status=failed and stores error when generator raises.""" db, job_id = _make_db(tmp_path) from scripts.db import insert_task, get_task_for_job task_id, _ = insert_task(db, "cover_letter", job_id) with patch("scripts.generate_cover_letter.generate", side_effect=RuntimeError("LLM timeout")): from scripts.task_runner import _run_task _run_task(db, task_id, "cover_letter", job_id) task = get_task_for_job(db, "cover_letter", job_id) assert task["status"] == "failed" assert "LLM timeout" in task["error"] def test_run_task_discovery_success(tmp_path): """_run_task with task_type=discovery calls run_discovery and stores count in error field.""" from scripts.db import init_db, insert_task, get_task_for_job db = tmp_path / "test.db" init_db(db) task_id, _ = insert_task(db, "discovery", 0) with patch("scripts.discover.run_discovery", return_value=7): from scripts.task_runner import _run_task _run_task(db, task_id, "discovery", 0) task = get_task_for_job(db, "discovery", 0) assert task["status"] == "completed" assert "7 new listings" in task["error"] def test_run_task_email_sync_success(tmp_path): """email_sync task calls sync_all and marks completed with summary.""" db, _ = _make_db(tmp_path) from scripts.db import insert_task, get_task_for_job task_id, _ = insert_task(db, "email_sync", 0) summary = {"synced": 3, "inbound": 5, "outbound": 2, "new_leads": 1, "errors": []} with patch("scripts.imap_sync.sync_all", return_value=summary): from scripts.task_runner import _run_task _run_task(db, task_id, "email_sync", 0) task = get_task_for_job(db, "email_sync", 0) assert task["status"] == "completed" assert "3 jobs" in task["error"] def test_run_task_email_sync_file_not_found(tmp_path): """email_sync marks failed with helpful message when config is missing.""" db, _ = _make_db(tmp_path) from scripts.db import insert_task, get_task_for_job task_id, _ = insert_task(db, "email_sync", 0) with patch("scripts.imap_sync.sync_all", side_effect=FileNotFoundError("config/email.yaml")): from scripts.task_runner import _run_task _run_task(db, task_id, "email_sync", 0) task = get_task_for_job(db, "email_sync", 0) assert task["status"] == "failed" assert "email" in task["error"].lower() def test_submit_task_actually_completes(tmp_path): """Integration: submit_task spawns a thread that completes asynchronously.""" db, job_id = _make_db(tmp_path) from scripts.db import get_task_for_job with patch("scripts.generate_cover_letter.generate", return_value="Cover letter text"): from scripts.task_runner import submit_task task_id, _ = submit_task(db, "cover_letter", job_id) # Wait for thread to complete (max 5s) for _ in range(50): task = get_task_for_job(db, "cover_letter", job_id) if task and task["status"] in ("completed", "failed"): break time.sleep(0.1) task = get_task_for_job(db, "cover_letter", job_id) assert task["status"] == "completed" def test_run_task_enrich_craigslist_success(tmp_path): """enrich_craigslist task calls enrich_craigslist_fields and marks completed.""" from scripts.db import init_db, insert_job, insert_task, get_task_for_job from unittest.mock import MagicMock db = tmp_path / "test.db" init_db(db) job_id = insert_job(db, { "title": "CSM", "company": "", "url": "https://sfbay.craigslist.org/jjj/d/9.html", "source": "craigslist", "location": "", "description": "Join Acme Corp. Pay: $100k.", "date_found": "2026-02-24", }) task_id, _ = insert_task(db, "enrich_craigslist", job_id) with patch("scripts.enrich_descriptions.enrich_craigslist_fields", return_value={"company": "Acme Corp", "salary": "$100k"}) as mock_enrich: from scripts.task_runner import _run_task _run_task(db, task_id, "enrich_craigslist", job_id) mock_enrich.assert_called_once_with(db, job_id) task = get_task_for_job(db, "enrich_craigslist", job_id) assert task["status"] == "completed" def test_scrape_url_submits_enrich_craigslist_for_craigslist_job(tmp_path): """After scrape_url completes for a craigslist job with empty company, enrich_craigslist is queued.""" from scripts.db import init_db, insert_job, insert_task, get_task_for_job db = tmp_path / "test.db" init_db(db) job_id = insert_job(db, { "title": "CSM", "company": "", "url": "https://sfbay.craigslist.org/jjj/d/10.html", "source": "craigslist", "location": "", "description": "", "date_found": "2026-02-24", }) task_id, _ = insert_task(db, "scrape_url", job_id) with patch("scripts.scrape_url.scrape_job_url", return_value={"title": "CSM", "company": ""}): with patch("scripts.task_runner.submit_task", wraps=None) as mock_submit: # Use wraps=None so we can capture calls without actually spawning threads mock_submit.return_value = (99, True) from scripts.task_runner import _run_task _run_task(db, task_id, "scrape_url", job_id) # submit_task should have been called with enrich_craigslist assert mock_submit.called call_args = mock_submit.call_args assert call_args[0][1] == "enrich_craigslist" assert call_args[0][2] == job_id import json as _json def test_wizard_generate_unknown_section_fails(tmp_path): """wizard_generate with unknown section marks task failed.""" db = tmp_path / "t.db" from scripts.db import init_db, insert_task init_db(db) params = _json.dumps({"section": "nonexistent_section", "input": {}}) task_id, _ = insert_task(db, "wizard_generate", 0, params=params) # Call _run_task directly (not via thread) to test synchronously from scripts.task_runner import _run_task _run_task(db, task_id, "wizard_generate", 0, params=params) import sqlite3 conn = sqlite3.connect(db) row = conn.execute("SELECT status, error FROM background_tasks WHERE id=?", (task_id,)).fetchone() conn.close() assert row[0] == "failed", f"Expected 'failed', got '{row[0]}'" def test_wizard_generate_missing_section_fails(tmp_path): """wizard_generate with no section key marks task failed.""" db = tmp_path / "t.db" from scripts.db import init_db, insert_task init_db(db) params = _json.dumps({"input": {"resume_text": "some text"}}) # missing section key task_id, _ = insert_task(db, "wizard_generate", 0, params=params) from scripts.task_runner import _run_task _run_task(db, task_id, "wizard_generate", 0, params=params) import sqlite3 conn = sqlite3.connect(db) row = conn.execute("SELECT status FROM background_tasks WHERE id=?", (task_id,)).fetchone() conn.close() assert row[0] == "failed" def test_wizard_generate_null_params_fails(tmp_path): """wizard_generate with params=None marks task failed.""" db = tmp_path / "t.db" from scripts.db import init_db, insert_task init_db(db) task_id, _ = insert_task(db, "wizard_generate", 0, params=None) from scripts.task_runner import _run_task _run_task(db, task_id, "wizard_generate", 0, params=None) import sqlite3 conn = sqlite3.connect(db) row = conn.execute("SELECT status FROM background_tasks WHERE id=?", (task_id,)).fetchone() conn.close() assert row[0] == "failed" def test_wizard_generate_stores_result_as_json(tmp_path): """wizard_generate stores result JSON in error field on success.""" from unittest.mock import patch, MagicMock db = tmp_path / "t.db" from scripts.db import init_db, insert_task init_db(db) params = _json.dumps({"section": "career_summary", "input": {"resume_text": "10 years Python"}}) task_id, _ = insert_task(db, "wizard_generate", 0, params=params) # Mock _run_wizard_generate to return a simple string with patch("scripts.task_runner._run_wizard_generate", return_value="Experienced Python developer."): from scripts.task_runner import _run_task _run_task(db, task_id, "wizard_generate", 0, params=params) import sqlite3 conn = sqlite3.connect(db) row = conn.execute("SELECT status, error FROM background_tasks WHERE id=?", (task_id,)).fetchone() conn.close() assert row[0] == "completed", f"Expected 'completed', got '{row[0]}'" payload = _json.loads(row[1]) assert payload["section"] == "career_summary" assert payload["result"] == "Experienced Python developer." def test_wizard_generate_feedback_appended_to_prompt(tmp_path): """feedback and previous_result fields in input_data are appended to the prompt.""" from unittest.mock import patch, MagicMock db = tmp_path / "t.db" from scripts.db import init_db, insert_task init_db(db) captured_prompts = [] def mock_complete(prompt): captured_prompts.append(prompt) return "Revised career summary." import json as _json params = _json.dumps({ "section": "career_summary", "input": { "resume_text": "10 years Python dev", "previous_result": "Original summary text.", "feedback": "Make it shorter and focus on leadership.", } }) task_id, _ = insert_task(db, "wizard_generate", 0, params=params) with patch("scripts.llm_router.LLMRouter") as MockRouter: MockRouter.return_value.complete.side_effect = mock_complete from scripts.task_runner import _run_task _run_task(db, task_id, "wizard_generate", 0, params=params) assert len(captured_prompts) == 1 prompt_used = captured_prompts[0] assert "Original summary text." in prompt_used assert "Make it shorter and focus on leadership." in prompt_used assert "Please revise accordingly." in prompt_used def test_wizard_generate_no_feedback_no_revision_block(tmp_path): """When no feedback/previous_result provided, prompt has no revision block.""" from unittest.mock import patch db = tmp_path / "t.db" from scripts.db import init_db, insert_task init_db(db) captured_prompts = [] import json as _json params = _json.dumps({ "section": "career_summary", "input": {"resume_text": "5 years QA engineer"} }) task_id, _ = insert_task(db, "wizard_generate", 0, params=params) with patch("scripts.llm_router.LLMRouter") as MockRouter: MockRouter.return_value.complete.side_effect = lambda p: (captured_prompts.append(p) or "Summary.") from scripts.task_runner import _run_task _run_task(db, task_id, "wizard_generate", 0, params=params) assert "Please revise accordingly." not in captured_prompts[0] assert "Previous output:" not in captured_prompts[0]