test: complete email sync test coverage — 44 new tests across all checklist sections
This commit is contained in:
parent
3076f051d7
commit
f3cfd258c6
1 changed files with 785 additions and 0 deletions
|
|
@ -328,3 +328,788 @@ def test_scan_unmatched_leads_linkedin_alert_skips_llm_path(tmp_path):
|
|||
|
||||
# LLM extraction must never be called for alert emails
|
||||
mock_llm.assert_not_called()
|
||||
|
||||
|
||||
# ── _has_rejection_or_ats_signal ──────────────────────────────────────────────
|
||||
|
||||
def test_rejection_phrase_at_body_boundary():
|
||||
"""Rejection phrase at char 1501 is NOT caught — only first 1500 chars checked."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
# "unfortunately" appears just past the 1500-char window
|
||||
padding = "x " * 750 # 1500 chars
|
||||
body = padding + "unfortunately we will not be moving forward"
|
||||
assert _has_rejection_or_ats_signal("No subject match", body) is False
|
||||
|
||||
|
||||
def test_rejection_phrase_within_body_limit():
|
||||
"""Rejection phrase within first 1500 chars IS caught."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
body = "We regret to inform you that we will not be moving forward."
|
||||
assert _has_rejection_or_ats_signal("Application Update", body) is True
|
||||
|
||||
|
||||
def test_dont_forget_right_single_quote():
|
||||
"""Right single quotation mark (\u2019) in 'don\u2019t forget' is blocked."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
body = "don\u2019t forget to complete your application"
|
||||
assert _has_rejection_or_ats_signal("Reminder", body) is True
|
||||
|
||||
|
||||
def test_dont_forget_left_single_quote():
|
||||
"""Left single quotation mark (\u2018) in 'don\u2018t forget' is blocked."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
body = "don\u2018t forget to complete your application"
|
||||
assert _has_rejection_or_ats_signal("Reminder", body) is True
|
||||
|
||||
|
||||
def test_ats_subject_phrase_not_matched_in_body_only():
|
||||
"""ATS confirm phrase in body alone does NOT trigger — subject-only check."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
# "thank you for applying" is an ATS subject phrase; must NOT be caught in body only
|
||||
body = "Hi Alex, thank you for applying to our Senior TAM role. We'd love to chat."
|
||||
assert _has_rejection_or_ats_signal("Interview Invitation", body) is False
|
||||
|
||||
|
||||
def test_ats_subject_phrase_matched_in_subject():
|
||||
"""ATS confirm phrase in subject triggers the filter."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
assert _has_rejection_or_ats_signal("Thank you for applying to Acme", "") is True
|
||||
|
||||
|
||||
def test_spam_subject_prefix_at_sign():
|
||||
"""Subject starting with '@' is blocked (Depop / social commerce pattern)."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
assert _has_rejection_or_ats_signal("@user sent you a special offer", "") is True
|
||||
|
||||
|
||||
def test_rejection_uppercase_lowercased():
|
||||
"""'UNFORTUNATELY' in body is downcased and caught correctly."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
assert _has_rejection_or_ats_signal("Update", "UNFORTUNATELY we have decided to go another direction.") is True
|
||||
|
||||
|
||||
def test_rejection_phrase_in_quoted_thread_beyond_limit_not_blocked():
|
||||
"""Rejection phrase beyond 1500-char body window does not block the email."""
|
||||
from scripts.imap_sync import _has_rejection_or_ats_signal
|
||||
clean_intro = "Hi Alex, we'd love to schedule a call with you. " * 30 # ~1500 chars
|
||||
quoted_footer = "\n\nOn Mon, Jan 1 wrote:\n> Unfortunately we went with another candidate."
|
||||
body = clean_intro + quoted_footer
|
||||
# The phrase lands after the 1500-char cutoff — should NOT be blocked
|
||||
assert _has_rejection_or_ats_signal("Interview Invitation", body) is False
|
||||
|
||||
|
||||
# ── _quote_folder ─────────────────────────────────────────────────────────────
|
||||
|
||||
def test_quote_folder_with_spaces():
|
||||
from scripts.imap_sync import _quote_folder
|
||||
assert _quote_folder("TO DO JOBS") == '"TO DO JOBS"'
|
||||
|
||||
|
||||
def test_quote_folder_no_spaces():
|
||||
from scripts.imap_sync import _quote_folder
|
||||
assert _quote_folder("INBOX") == "INBOX"
|
||||
|
||||
|
||||
def test_quote_folder_internal_double_quotes():
|
||||
from scripts.imap_sync import _quote_folder
|
||||
assert _quote_folder('My "Jobs"') == '"My \\"Jobs\\""'
|
||||
|
||||
|
||||
# ── _search_folder ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_search_folder_nonexistent_returns_empty():
|
||||
"""_search_folder returns [] when folder SELECT raises (folder doesn't exist)."""
|
||||
from scripts.imap_sync import _search_folder
|
||||
conn = MagicMock()
|
||||
conn.select.side_effect = Exception("NO folder not found")
|
||||
result = _search_folder(conn, "DOES_NOT_EXIST", "ALL", "01-Jan-2026")
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_search_folder_special_gmail_name():
|
||||
"""[Gmail]/All Mail folder name is quoted because it contains a space."""
|
||||
from scripts.imap_sync import _search_folder
|
||||
conn = MagicMock()
|
||||
conn.select.return_value = ("OK", [b"1"])
|
||||
conn.search.return_value = ("OK", [b"1 2"])
|
||||
result = _search_folder(conn, "[Gmail]/All Mail", "ALL", "01-Jan-2026")
|
||||
# Should not raise; select should be called with the quoted form
|
||||
conn.select.assert_called_once_with('"[Gmail]/All Mail"', readonly=True)
|
||||
assert result == [b"1", b"2"]
|
||||
|
||||
|
||||
# ── _get_existing_message_ids ─────────────────────────────────────────────────
|
||||
|
||||
def test_get_existing_message_ids_excludes_null(tmp_path):
|
||||
"""NULL message_id rows are excluded from the returned set."""
|
||||
import sqlite3
|
||||
from scripts.db import init_db, insert_job, add_contact
|
||||
from scripts.imap_sync import _get_existing_message_ids
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
job_id = insert_job(db_path, {
|
||||
"title": "CSM", "company": "Acme", "url": "https://acme.com/1",
|
||||
"source": "test", "location": "", "is_remote": 0,
|
||||
"salary": "", "description": "", "date_found": "2026-01-01",
|
||||
})
|
||||
# Insert contact with NULL message_id via raw SQL
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute(
|
||||
"INSERT INTO job_contacts (job_id, direction, subject, from_addr, body, received_at) "
|
||||
"VALUES (?, 'inbound', 'subj', 'f@x.com', 'body', '2026-01-01')",
|
||||
(job_id,)
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
ids = _get_existing_message_ids(job_id, db_path)
|
||||
assert None not in ids
|
||||
assert "" not in ids
|
||||
|
||||
|
||||
def test_get_existing_message_ids_excludes_empty_string(tmp_path):
|
||||
"""Empty-string message_id rows are excluded."""
|
||||
import sqlite3
|
||||
from scripts.db import init_db, insert_job
|
||||
from scripts.imap_sync import _get_existing_message_ids
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
job_id = insert_job(db_path, {
|
||||
"title": "CSM", "company": "Acme", "url": "https://acme.com/2",
|
||||
"source": "test", "location": "", "is_remote": 0,
|
||||
"salary": "", "description": "", "date_found": "2026-01-01",
|
||||
})
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute(
|
||||
"INSERT INTO job_contacts (job_id, direction, subject, from_addr, body, received_at, message_id) "
|
||||
"VALUES (?, 'inbound', 'subj', 'f@x.com', 'body', '2026-01-01', '')",
|
||||
(job_id,)
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
ids = _get_existing_message_ids(job_id, db_path)
|
||||
assert "" not in ids
|
||||
|
||||
|
||||
def test_get_existing_message_ids_no_contacts(tmp_path):
|
||||
"""Job with no contacts returns an empty set."""
|
||||
from scripts.db import init_db, insert_job
|
||||
from scripts.imap_sync import _get_existing_message_ids
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
job_id = insert_job(db_path, {
|
||||
"title": "CSM", "company": "Acme", "url": "https://acme.com/3",
|
||||
"source": "test", "location": "", "is_remote": 0,
|
||||
"salary": "", "description": "", "date_found": "2026-01-01",
|
||||
})
|
||||
assert _get_existing_message_ids(job_id, db_path) == set()
|
||||
|
||||
|
||||
# ── _parse_message ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_parse_message_no_message_id_returns_none():
|
||||
"""Email with no Message-ID header returns None."""
|
||||
from scripts.imap_sync import _parse_message
|
||||
|
||||
raw = (
|
||||
b"From: recruiter@acme.com\r\n"
|
||||
b"Subject: Interview Invitation\r\n"
|
||||
b"\r\n"
|
||||
b"Hi Alex!"
|
||||
)
|
||||
conn = MagicMock()
|
||||
conn.fetch.return_value = ("OK", [(b"1 (RFC822 {40})", raw)])
|
||||
assert _parse_message(conn, b"1") is None
|
||||
|
||||
|
||||
def test_parse_message_rfc2047_subject_decoded():
|
||||
"""RFC2047-encoded subject is decoded correctly."""
|
||||
from scripts.imap_sync import _parse_message
|
||||
|
||||
# "Interview" encoded as UTF-8 base64
|
||||
raw = (
|
||||
b"From: recruiter@acme.com\r\n"
|
||||
b"Message-ID: <decode-test@acme.com>\r\n"
|
||||
b"Subject: =?utf-8?b?SW50ZXJ2aWV3?=\r\n"
|
||||
b"\r\n"
|
||||
b"Let's schedule a call."
|
||||
)
|
||||
conn = MagicMock()
|
||||
conn.fetch.return_value = ("OK", [(b"1 (RFC822 {100})", raw)])
|
||||
result = _parse_message(conn, b"1")
|
||||
assert result is not None
|
||||
assert "Interview" in result["subject"]
|
||||
|
||||
|
||||
# ── classify_stage_signal ─────────────────────────────────────────────────────
|
||||
|
||||
def test_classify_stage_signal_returns_neutral_on_no_label_match():
|
||||
"""Returns 'neutral' when LLM output matches no known label."""
|
||||
from scripts.imap_sync import classify_stage_signal
|
||||
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
|
||||
mock_router.complete.return_value = "I cannot determine the category."
|
||||
result = classify_stage_signal("Generic update", "No clear signal here.")
|
||||
assert result == "neutral"
|
||||
|
||||
|
||||
# ── extract_lead_info ─────────────────────────────────────────────────────────
|
||||
|
||||
def test_extract_lead_info_returns_none_on_llm_error():
|
||||
"""extract_lead_info returns (None, None) when LLM call raises."""
|
||||
from scripts.imap_sync import extract_lead_info
|
||||
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
|
||||
mock_router.complete.side_effect = RuntimeError("timeout")
|
||||
result = extract_lead_info("Senior TAM at Wiz", "Hi Alex…", "r@wiz.com")
|
||||
assert result == (None, None)
|
||||
|
||||
|
||||
# ── _scan_unmatched_leads — signal gating ─────────────────────────────────────
|
||||
|
||||
_PLAIN_RECRUIT_EMAIL = {
|
||||
"message_id": "<recruit-001@acme.com>",
|
||||
"from_addr": "recruiter@acme.com",
|
||||
"to_addr": "alex@example.com",
|
||||
"subject": "Interview Opportunity at Acme",
|
||||
"body": "Hi Alex, we have an exciting opportunity for you.",
|
||||
"date": "2026-02-25 10:00:00",
|
||||
}
|
||||
|
||||
|
||||
def test_scan_unmatched_leads_skips_when_signal_none(tmp_path):
|
||||
"""When classify_stage_signal returns None, lead is not inserted."""
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import _scan_unmatched_leads
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value=None), \
|
||||
patch("scripts.imap_sync.extract_lead_info") as mock_extract:
|
||||
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
|
||||
|
||||
assert result == 0
|
||||
mock_extract.assert_not_called()
|
||||
|
||||
|
||||
def test_scan_unmatched_leads_skips_when_signal_rejected(tmp_path):
|
||||
"""When signal is 'rejected', lead is not inserted."""
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import _scan_unmatched_leads
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value="rejected"), \
|
||||
patch("scripts.imap_sync.extract_lead_info") as mock_extract:
|
||||
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
|
||||
|
||||
assert result == 0
|
||||
mock_extract.assert_not_called()
|
||||
|
||||
|
||||
def test_scan_unmatched_leads_proceeds_when_signal_neutral(tmp_path):
|
||||
"""When signal is 'neutral', LLM extraction is still attempted."""
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import _scan_unmatched_leads
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"), \
|
||||
patch("scripts.imap_sync.extract_lead_info", return_value=("Acme", "Senior TAM")), \
|
||||
patch("scripts.task_runner.submit_task"):
|
||||
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
|
||||
|
||||
assert result == 1
|
||||
|
||||
|
||||
def test_scan_unmatched_leads_rejection_phrase_blocks_llm(tmp_path):
|
||||
"""Email with rejection phrase in body is filtered before LLM is called."""
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import _scan_unmatched_leads
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
rejection_email = {**_PLAIN_RECRUIT_EMAIL,
|
||||
"body": "Unfortunately we have decided not to move forward."}
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=rejection_email), \
|
||||
patch("scripts.imap_sync.classify_stage_signal") as mock_classify:
|
||||
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
|
||||
|
||||
assert result == 0
|
||||
mock_classify.assert_not_called()
|
||||
|
||||
|
||||
def test_scan_unmatched_leads_genuine_lead_has_synthetic_url(tmp_path):
|
||||
"""A genuine lead is inserted with a synthetic email:// URL."""
|
||||
import sqlite3
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import _scan_unmatched_leads
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value="interview_scheduled"), \
|
||||
patch("scripts.imap_sync.extract_lead_info", return_value=("Acme", "Senior TAM")), \
|
||||
patch("scripts.task_runner.submit_task"):
|
||||
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
|
||||
|
||||
assert result == 1
|
||||
conn = sqlite3.connect(db_path)
|
||||
row = conn.execute("SELECT url FROM jobs LIMIT 1").fetchone()
|
||||
conn.close()
|
||||
assert row[0].startswith("email://")
|
||||
|
||||
|
||||
def test_scan_unmatched_leads_no_reinsert_on_second_run(tmp_path):
|
||||
"""Same email not re-inserted on a second sync run (known_message_ids dedup)."""
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import _scan_unmatched_leads
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
known = set()
|
||||
shared_kwargs = dict(
|
||||
conn=MagicMock(),
|
||||
cfg={"lookback_days": 90},
|
||||
db_path=db_path,
|
||||
)
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"), \
|
||||
patch("scripts.imap_sync.extract_lead_info", return_value=("Acme", "TAM")), \
|
||||
patch("scripts.task_runner.submit_task"):
|
||||
first = _scan_unmatched_leads(**shared_kwargs, known_message_ids=known)
|
||||
second = _scan_unmatched_leads(**shared_kwargs, known_message_ids=known)
|
||||
|
||||
assert first == 1
|
||||
assert second == 0
|
||||
|
||||
|
||||
def test_scan_unmatched_leads_extract_none_no_insert(tmp_path):
|
||||
"""When extract_lead_info returns (None, None), no job is inserted."""
|
||||
import sqlite3
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import _scan_unmatched_leads
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"), \
|
||||
patch("scripts.imap_sync.extract_lead_info", return_value=(None, None)):
|
||||
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
|
||||
|
||||
assert result == 0
|
||||
conn = sqlite3.connect(db_path)
|
||||
count = conn.execute("SELECT COUNT(*) FROM jobs").fetchone()[0]
|
||||
conn.close()
|
||||
assert count == 0
|
||||
|
||||
|
||||
# ── _scan_todo_label ──────────────────────────────────────────────────────────
|
||||
|
||||
def _make_job(db_path, company="Acme", url="https://acme.com/job/1"):
|
||||
from scripts.db import init_db, insert_job
|
||||
init_db(db_path)
|
||||
return insert_job(db_path, {
|
||||
"title": "CSM", "company": company, "url": url,
|
||||
"source": "test", "location": "", "is_remote": 0,
|
||||
"salary": "", "description": "", "date_found": "2026-01-01",
|
||||
})
|
||||
|
||||
|
||||
def test_scan_todo_label_empty_string_returns_zero(tmp_path):
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
db_path = tmp_path / "test.db"
|
||||
_make_job(db_path)
|
||||
assert _scan_todo_label(MagicMock(), {"todo_label": ""}, db_path, [], set()) == 0
|
||||
|
||||
|
||||
def test_scan_todo_label_missing_key_returns_zero(tmp_path):
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
db_path = tmp_path / "test.db"
|
||||
_make_job(db_path)
|
||||
assert _scan_todo_label(MagicMock(), {}, db_path, [], set()) == 0
|
||||
|
||||
|
||||
def test_scan_todo_label_folder_not_found_returns_zero(tmp_path):
|
||||
"""When folder doesn't exist on server, returns 0 without crashing."""
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
db_path = tmp_path / "test.db"
|
||||
_make_job(db_path)
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[]):
|
||||
result = _scan_todo_label(
|
||||
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
|
||||
db_path, [], set()
|
||||
)
|
||||
assert result == 0
|
||||
|
||||
|
||||
def test_scan_todo_label_email_matches_company_and_keyword(tmp_path):
|
||||
"""Email matching company name + TODO action keyword gets attached."""
|
||||
from scripts.db import get_contacts
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
job_id = _make_job(db_path)
|
||||
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
|
||||
|
||||
todo_email = {
|
||||
"message_id": "<todo-001@acme.com>",
|
||||
"from_addr": "recruiter@acme.com",
|
||||
"to_addr": "alex@example.com",
|
||||
"subject": "Interview scheduled with Acme",
|
||||
"body": "Hi Alex, your interview is confirmed.",
|
||||
"date": "2026-02-25 10:00:00",
|
||||
}
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=todo_email), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"):
|
||||
result = _scan_todo_label(
|
||||
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
|
||||
db_path, active_jobs, set()
|
||||
)
|
||||
|
||||
assert result == 1
|
||||
contacts = get_contacts(db_path, job_id=job_id)
|
||||
assert len(contacts) == 1
|
||||
assert contacts[0]["subject"] == "Interview scheduled with Acme"
|
||||
|
||||
|
||||
def test_scan_todo_label_no_action_keyword_skipped(tmp_path):
|
||||
"""Email with company match but no TODO keyword is skipped."""
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
job_id = _make_job(db_path)
|
||||
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
|
||||
|
||||
no_keyword_email = {
|
||||
"message_id": "<todo-002@acme.com>",
|
||||
"from_addr": "noreply@acme.com",
|
||||
"to_addr": "alex@example.com",
|
||||
"subject": "Acme newsletter",
|
||||
"body": "Company updates this week.",
|
||||
"date": "2026-02-25 10:00:00",
|
||||
}
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=no_keyword_email):
|
||||
result = _scan_todo_label(
|
||||
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
|
||||
db_path, active_jobs, set()
|
||||
)
|
||||
|
||||
assert result == 0
|
||||
|
||||
|
||||
def test_scan_todo_label_no_company_match_skipped(tmp_path):
|
||||
"""Email with no company name in from/subject/body[:300] is skipped."""
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
job_id = _make_job(db_path, company="Acme")
|
||||
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
|
||||
|
||||
unrelated_email = {
|
||||
"message_id": "<todo-003@other.com>",
|
||||
"from_addr": "recruiter@other.com",
|
||||
"to_addr": "alex@example.com",
|
||||
"subject": "Interview scheduled with OtherCo",
|
||||
"body": "Hi Alex, interview with OtherCo confirmed.",
|
||||
"date": "2026-02-25 10:00:00",
|
||||
}
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=unrelated_email):
|
||||
result = _scan_todo_label(
|
||||
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
|
||||
db_path, active_jobs, set()
|
||||
)
|
||||
|
||||
assert result == 0
|
||||
|
||||
|
||||
def test_scan_todo_label_duplicate_message_id_not_reinserted(tmp_path):
|
||||
"""Email already in known_message_ids is not re-attached."""
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
job_id = _make_job(db_path)
|
||||
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
|
||||
|
||||
todo_email = {
|
||||
"message_id": "<already-seen@acme.com>",
|
||||
"from_addr": "recruiter@acme.com",
|
||||
"to_addr": "alex@example.com",
|
||||
"subject": "Interview scheduled with Acme",
|
||||
"body": "Hi Alex.",
|
||||
"date": "2026-02-25 10:00:00",
|
||||
}
|
||||
|
||||
known = {"<already-seen@acme.com>"}
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=todo_email):
|
||||
result = _scan_todo_label(
|
||||
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
|
||||
db_path, active_jobs, known
|
||||
)
|
||||
|
||||
assert result == 0
|
||||
|
||||
|
||||
def test_scan_todo_label_stage_signal_set_for_non_neutral(tmp_path):
|
||||
"""Non-neutral classifier signal is written to the contact row."""
|
||||
import sqlite3
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
job_id = _make_job(db_path)
|
||||
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
|
||||
|
||||
todo_email = {
|
||||
"message_id": "<signal-001@acme.com>",
|
||||
"from_addr": "recruiter@acme.com",
|
||||
"to_addr": "alex@example.com",
|
||||
"subject": "Interview scheduled with Acme",
|
||||
"body": "Your phone screen is confirmed.",
|
||||
"date": "2026-02-25 10:00:00",
|
||||
}
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=todo_email), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value="interview_scheduled"):
|
||||
_scan_todo_label(
|
||||
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
|
||||
db_path, active_jobs, set()
|
||||
)
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
row = conn.execute("SELECT stage_signal FROM job_contacts LIMIT 1").fetchone()
|
||||
conn.close()
|
||||
assert row[0] == "interview_scheduled"
|
||||
|
||||
|
||||
def test_scan_todo_label_body_fallback_matches(tmp_path):
|
||||
"""Company name only in body[:300] still triggers a match (body fallback)."""
|
||||
from scripts.db import get_contacts
|
||||
from scripts.imap_sync import _scan_todo_label
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
job_id = _make_job(db_path, company="Acme")
|
||||
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
|
||||
|
||||
# Company not in from_addr or subject — only in body
|
||||
body_only_email = {
|
||||
"message_id": "<body-fallback@noreply.greenhouse.io>",
|
||||
"from_addr": "noreply@greenhouse.io",
|
||||
"to_addr": "alex@example.com",
|
||||
"subject": "Interview scheduled",
|
||||
"body": "Your interview with Acme has been confirmed for tomorrow.",
|
||||
"date": "2026-02-25 10:00:00",
|
||||
}
|
||||
|
||||
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
|
||||
patch("scripts.imap_sync._parse_message", return_value=body_only_email), \
|
||||
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"):
|
||||
result = _scan_todo_label(
|
||||
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
|
||||
db_path, active_jobs, set()
|
||||
)
|
||||
|
||||
assert result == 1
|
||||
|
||||
|
||||
# ── sync_all ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_sync_all_no_active_jobs_returns_full_dict(tmp_path):
|
||||
"""With no active jobs, sync_all returns a dict with all 6 expected keys."""
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import sync_all
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
with patch("scripts.imap_sync.load_config", return_value={}), \
|
||||
patch("scripts.imap_sync.get_interview_jobs", return_value={}):
|
||||
result = sync_all(db_path=db_path)
|
||||
|
||||
expected_keys = {"synced", "inbound", "outbound", "new_leads", "todo_attached", "errors"}
|
||||
assert set(result.keys()) == expected_keys
|
||||
assert result["todo_attached"] == 0
|
||||
|
||||
|
||||
def test_sync_all_on_stage_callback_fires(tmp_path):
|
||||
"""on_stage callback is called with expected stage labels."""
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import sync_all
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
fake_job = {"id": 1, "company": "Acme", "url": "https://acme.com/1"}
|
||||
stages = []
|
||||
conn_mock = MagicMock()
|
||||
conn_mock.logout.return_value = ("OK", [])
|
||||
|
||||
with patch("scripts.imap_sync.load_config", return_value={}), \
|
||||
patch("scripts.imap_sync.get_interview_jobs", return_value={"applied": [fake_job]}), \
|
||||
patch("scripts.imap_sync.connect", return_value=conn_mock), \
|
||||
patch("scripts.imap_sync.sync_job_emails", return_value=(0, 0)), \
|
||||
patch("scripts.db.get_all_message_ids", return_value=set()), \
|
||||
patch("scripts.imap_sync._scan_todo_label", return_value=0), \
|
||||
patch("scripts.imap_sync._scan_unmatched_leads", return_value=0):
|
||||
sync_all(db_path=db_path, on_stage=stages.append)
|
||||
|
||||
assert "connecting" in stages
|
||||
assert "scanning todo label" in stages
|
||||
assert "scanning leads" in stages
|
||||
|
||||
|
||||
def test_sync_all_per_job_exception_continues(tmp_path):
|
||||
"""Exception for one job does not abort sync of remaining jobs."""
|
||||
from scripts.db import init_db
|
||||
from scripts.imap_sync import sync_all
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
|
||||
fake_jobs = [
|
||||
{"id": 1, "company": "Co0", "url": "https://co0.com/1"},
|
||||
{"id": 2, "company": "Co1", "url": "https://co1.com/1"},
|
||||
]
|
||||
conn_mock = MagicMock()
|
||||
conn_mock.logout.return_value = ("OK", [])
|
||||
|
||||
call_count = {"n": 0}
|
||||
def flaky_sync(job, *args, **kwargs):
|
||||
call_count["n"] += 1
|
||||
if call_count["n"] == 1:
|
||||
raise RuntimeError("IMAP timeout")
|
||||
return (1, 0)
|
||||
|
||||
with patch("scripts.imap_sync.load_config", return_value={}), \
|
||||
patch("scripts.imap_sync.get_interview_jobs", return_value={"applied": fake_jobs}), \
|
||||
patch("scripts.imap_sync.connect", return_value=conn_mock), \
|
||||
patch("scripts.imap_sync.sync_job_emails", side_effect=flaky_sync), \
|
||||
patch("scripts.db.get_all_message_ids", return_value=set()), \
|
||||
patch("scripts.imap_sync._scan_todo_label", return_value=0), \
|
||||
patch("scripts.imap_sync._scan_unmatched_leads", return_value=0):
|
||||
result = sync_all(db_path=db_path)
|
||||
|
||||
assert len(result["errors"]) == 1
|
||||
assert result["synced"] == 1 # second job succeeded
|
||||
|
||||
|
||||
# ── Performance / edge cases ──────────────────────────────────────────────────
|
||||
|
||||
def test_parse_message_large_body_truncated():
|
||||
"""Body longer than 4000 chars is silently truncated to 4000."""
|
||||
from scripts.imap_sync import _parse_message
|
||||
|
||||
big_body = ("x" * 10_000).encode()
|
||||
raw = (
|
||||
b"From: r@acme.com\r\nMessage-ID: <big@acme.com>\r\n"
|
||||
b"Subject: Interview\r\n\r\n"
|
||||
) + big_body
|
||||
conn = MagicMock()
|
||||
conn.fetch.return_value = ("OK", [(b"1 (RFC822)", raw)])
|
||||
result = _parse_message(conn, b"1")
|
||||
assert result is not None
|
||||
assert len(result["body"]) <= 4000
|
||||
|
||||
|
||||
def test_parse_message_binary_attachment_no_crash():
|
||||
"""Email with binary attachment returns a valid dict without crashing."""
|
||||
from scripts.imap_sync import _parse_message
|
||||
import email as _email
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.application import MIMEApplication
|
||||
|
||||
msg = MIMEMultipart()
|
||||
msg["From"] = "r@acme.com"
|
||||
msg["Message-ID"] = "<attach@acme.com>"
|
||||
msg["Subject"] = "Offer letter attached"
|
||||
msg.attach(MIMEText("Please find the attached offer letter.", "plain"))
|
||||
msg.attach(MIMEApplication(b"\x00\x01\x02\x03" * 100, Name="offer.pdf"))
|
||||
|
||||
conn = MagicMock()
|
||||
conn.fetch.return_value = ("OK", [(b"1 (RFC822)", msg.as_bytes())])
|
||||
result = _parse_message(conn, b"1")
|
||||
assert result is not None
|
||||
assert result["message_id"] == "<attach@acme.com>"
|
||||
|
||||
|
||||
def test_parse_message_multiple_text_parts_takes_first():
|
||||
"""Email with multiple text/plain MIME parts uses only the first."""
|
||||
from scripts.imap_sync import _parse_message
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
msg = MIMEMultipart()
|
||||
msg["From"] = "r@acme.com"
|
||||
msg["Message-ID"] = "<multipart@acme.com>"
|
||||
msg["Subject"] = "Interview"
|
||||
msg.attach(MIMEText("First part — the real body.", "plain"))
|
||||
msg.attach(MIMEText("Second part — should be ignored.", "plain"))
|
||||
|
||||
conn = MagicMock()
|
||||
conn.fetch.return_value = ("OK", [(b"1 (RFC822)", msg.as_bytes())])
|
||||
result = _parse_message(conn, b"1")
|
||||
assert result is not None
|
||||
assert "First part" in result["body"]
|
||||
assert "Second part" not in result["body"]
|
||||
|
||||
|
||||
def test_get_all_message_ids_performance(tmp_path):
|
||||
"""get_all_message_ids with 1000 rows completes quickly (smoke test for scale)."""
|
||||
import sqlite3
|
||||
import time
|
||||
from scripts.db import init_db, insert_job
|
||||
from scripts.db import get_all_message_ids
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
init_db(db_path)
|
||||
job_id = insert_job(db_path, {
|
||||
"title": "CSM", "company": "Acme", "url": "https://acme.com/perf",
|
||||
"source": "test", "location": "", "is_remote": 0,
|
||||
"salary": "", "description": "", "date_found": "2026-01-01",
|
||||
})
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.executemany(
|
||||
"INSERT INTO job_contacts (job_id, direction, subject, from_addr, body, received_at, message_id) "
|
||||
"VALUES (?, 'inbound', 'subj', 'f@x.com', 'body', '2026-01-01', ?)",
|
||||
[(job_id, f"<mid-{i}@x.com>") for i in range(1000)]
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
start = time.monotonic()
|
||||
ids = get_all_message_ids(db_path)
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
assert len(ids) == 1000
|
||||
assert elapsed < 1.0
|
||||
|
|
|
|||
Loading…
Reference in a new issue