peregrine/tests/test_imap_sync.py

1115 lines
44 KiB
Python

"""Tests for imap_sync helpers (no live IMAP connection required)."""
import pytest
from unittest.mock import patch, MagicMock
def test_classify_stage_signal_interview():
"""classify_stage_signal returns interview_scheduled for a call-scheduling email."""
from scripts.imap_sync import classify_stage_signal
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
mock_router.complete.return_value = "interview_scheduled"
result = classify_stage_signal(
"Let's schedule a call",
"Hi Alex, we'd love to book a 30-min phone screen with you.",
)
assert result == "interview_scheduled"
def test_classify_stage_signal_returns_none_on_error():
"""classify_stage_signal returns None when LLM call raises."""
from scripts.imap_sync import classify_stage_signal
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
mock_router.complete.side_effect = RuntimeError("model not loaded")
result = classify_stage_signal("subject", "body")
assert result is None
def test_classify_stage_signal_strips_think_tags():
"""classify_stage_signal strips <think>...</think> blocks before parsing."""
from scripts.imap_sync import classify_stage_signal
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
mock_router.complete.return_value = "<think>Let me think...</think>\nrejected"
result = classify_stage_signal("Update on your application", "We went with another candidate.")
assert result == "rejected"
def test_normalise_company():
"""_normalise_company strips legal suffixes."""
from scripts.imap_sync import _normalise_company
assert _normalise_company("DataStax, Inc.") == "DataStax"
assert _normalise_company("Wiz Ltd") == "Wiz"
assert _normalise_company("Crusoe Energy") == "Crusoe Energy"
def test_company_search_terms_excludes_job_board_sld():
"""Job-board domains like linkedin.com are never used as match terms."""
from scripts.imap_sync import _company_search_terms
# LinkedIn-sourced job: SLD "linkedin" must not appear in the terms
terms = _company_search_terms("Bamboo Health", "https://www.linkedin.com/jobs/view/123")
assert "linkedin" not in terms
assert "bamboo health" in terms
# Company with its own domain: SLD should be included
terms = _company_search_terms("Crusoe Energy", "https://crusoe.ai/jobs/456")
assert "crusoe" in terms
# Indeed-sourced job: "indeed" excluded
terms = _company_search_terms("DoorDash", "https://www.indeed.com/viewjob?jk=abc")
assert "indeed" not in terms
assert "doordash" in terms
def test_has_recruitment_keyword():
"""_has_recruitment_keyword matches known keywords."""
from scripts.imap_sync import _has_recruitment_keyword
assert _has_recruitment_keyword("Interview Invitation — Senior TAM")
assert _has_recruitment_keyword("Your application with DataStax")
assert not _has_recruitment_keyword("Team lunch tomorrow")
def test_extract_lead_info_returns_company_and_title():
"""extract_lead_info parses LLM JSON response into (company, title)."""
from scripts.imap_sync import extract_lead_info
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
mock_router.complete.return_value = '{"company": "Wiz", "title": "Senior TAM"}'
result = extract_lead_info("Senior TAM at Wiz", "Hi Alex, we have a role…", "recruiter@wiz.com")
assert result == ("Wiz", "Senior TAM")
def test_extract_lead_info_returns_none_on_bad_json():
"""extract_lead_info returns (None, None) when LLM returns unparseable output."""
from scripts.imap_sync import extract_lead_info
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
mock_router.complete.return_value = "I cannot determine the company."
result = extract_lead_info("Job opportunity", "blah", "noreply@example.com")
assert result == (None, None)
def test_classify_labels_includes_survey_received():
"""_CLASSIFY_LABELS includes survey_received."""
from scripts.imap_sync import _CLASSIFY_LABELS
assert "survey_received" in _CLASSIFY_LABELS
def test_classify_stage_signal_returns_survey_received():
"""classify_stage_signal returns 'survey_received' when LLM outputs that label."""
from unittest.mock import patch
from scripts.imap_sync import classify_stage_signal
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
mock_router.complete.return_value = "survey_received"
result = classify_stage_signal("Complete our culture survey", "Please fill out this form")
assert result == "survey_received"
def test_sync_job_emails_classifies_inbound(tmp_path):
"""sync_job_emails classifies inbound emails and stores the stage_signal."""
from scripts.db import init_db, insert_job, get_contacts
from scripts.imap_sync import sync_job_emails
db_path = tmp_path / "test.db"
init_db(db_path)
job_id = insert_job(db_path, {
"title": "CSM", "company": "Acme",
"url": "https://acme.com/jobs/1",
"source": "linkedin", "location": "Remote",
"is_remote": True, "salary": "", "description": "",
"date_found": "2026-02-21",
})
job = {"id": job_id, "company": "Acme", "url": "https://acme.com/jobs/1"}
fake_msg_bytes = (
b"From: recruiter@acme.com\r\n"
b"To: alex@example.com\r\n"
b"Subject: Interview Invitation\r\n"
b"Message-ID: <unique-001@acme.com>\r\n"
b"\r\n"
b"Hi Alex, we'd like to schedule a phone screen."
)
conn_mock = MagicMock()
conn_mock.select.return_value = ("OK", [b"1"])
conn_mock.search.return_value = ("OK", [b"1"])
conn_mock.fetch.return_value = ("OK", [(b"1 (RFC822 {123})", fake_msg_bytes)])
with patch("scripts.imap_sync.classify_stage_signal", return_value="interview_scheduled"):
inb, out = sync_job_emails(job, conn_mock, {"lookback_days": 90}, db_path)
assert inb == 1
contacts = get_contacts(db_path, job_id=job_id)
assert contacts[0]["stage_signal"] == "interview_scheduled"
def test_parse_linkedin_alert_extracts_jobs():
from scripts.imap_sync import parse_linkedin_alert
body = """\
Your job alert for customer success manager in United States
New jobs match your preferences.
Manage alerts: https://www.linkedin.com/comm/jobs/alerts?...
Customer Success Manager
Reflow
California, United States
View job: https://www.linkedin.com/comm/jobs/view/4376518925/?trackingId=abc%3D%3D&refId=xyz
---------------------------------------------------------
Customer Engagement Manager
Bitwarden
United States
2 school alumni
Apply with resume & profile
View job: https://www.linkedin.com/comm/jobs/view/4359824983/?trackingId=def%3D%3D
---------------------------------------------------------
"""
jobs = parse_linkedin_alert(body)
assert len(jobs) == 2
assert jobs[0]["title"] == "Customer Success Manager"
assert jobs[0]["company"] == "Reflow"
assert jobs[0]["location"] == "California, United States"
assert jobs[0]["url"] == "https://www.linkedin.com/jobs/view/4376518925/"
assert jobs[1]["title"] == "Customer Engagement Manager"
assert jobs[1]["company"] == "Bitwarden"
assert jobs[1]["url"] == "https://www.linkedin.com/jobs/view/4359824983/"
def test_parse_linkedin_alert_skips_blocks_without_view_job():
from scripts.imap_sync import parse_linkedin_alert
body = """\
Customer Success Manager
Some Company
United States
---------------------------------------------------------
Valid Job Title
Valid Company
Remote
View job: https://www.linkedin.com/comm/jobs/view/1111111/?x=y
---------------------------------------------------------
"""
jobs = parse_linkedin_alert(body)
assert len(jobs) == 1
assert jobs[0]["title"] == "Valid Job Title"
def test_parse_linkedin_alert_empty_body():
from scripts.imap_sync import parse_linkedin_alert
assert parse_linkedin_alert("") == []
assert parse_linkedin_alert("No jobs here.") == []
# ── _scan_unmatched_leads integration ─────────────────────────────────────────
_ALERT_BODY = """\
Your job alert for customer success manager in United States
New jobs match your preferences.
Customer Success Manager
Acme Corp
California, United States
View job: https://www.linkedin.com/comm/jobs/view/9999001/?trackingId=abc
---------------------------------------------------------
Director of Customer Success
Beta Inc
Remote
View job: https://www.linkedin.com/comm/jobs/view/9999002/?trackingId=def
---------------------------------------------------------
"""
_ALERT_EMAIL = {
"message_id": "<alert-001@linkedin.com>",
"from_addr": "jobalerts-noreply@linkedin.com",
"to_addr": "alex@example.com",
"subject": "2 new jobs for customer success manager",
"body": _ALERT_BODY,
"date": "2026-02-24 12:00:00",
}
def test_scan_unmatched_leads_linkedin_alert_inserts_jobs(tmp_path):
"""_scan_unmatched_leads detects a LinkedIn alert and inserts each job card."""
import sqlite3
from unittest.mock import patch, MagicMock
from scripts.db import init_db
db_path = tmp_path / "test.db"
init_db(db_path)
conn_mock = MagicMock()
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_ALERT_EMAIL), \
patch("scripts.task_runner.submit_task") as mock_submit:
from scripts.imap_sync import _scan_unmatched_leads
known_ids: set = set()
new_leads = _scan_unmatched_leads(conn_mock, {"lookback_days": 90}, db_path, known_ids)
assert new_leads == 2
# Message ID added so it won't be reprocessed
assert "<alert-001@linkedin.com>" in known_ids
# Both jobs inserted with correct fields
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
jobs = conn.execute("SELECT * FROM jobs ORDER BY id").fetchall()
conn.close()
assert len(jobs) == 2
assert jobs[0]["title"] == "Customer Success Manager"
assert jobs[0]["company"] == "Acme Corp"
assert jobs[0]["url"] == "https://www.linkedin.com/jobs/view/9999001/"
assert jobs[0]["source"] == "linkedin"
assert jobs[1]["title"] == "Director of Customer Success"
assert jobs[1]["url"] == "https://www.linkedin.com/jobs/view/9999002/"
# scrape_url task submitted for each inserted job
assert mock_submit.call_count == 2
task_types = [call.args[1] for call in mock_submit.call_args_list]
assert task_types == ["scrape_url", "scrape_url"]
def test_scan_unmatched_leads_linkedin_alert_skips_duplicates(tmp_path):
"""URLs already in the DB are not re-inserted."""
from unittest.mock import patch, MagicMock
from scripts.db import init_db, insert_job
db_path = tmp_path / "test.db"
init_db(db_path)
# Pre-insert one of the two URLs
insert_job(db_path, {
"title": "Customer Success Manager", "company": "Acme Corp",
"url": "https://www.linkedin.com/jobs/view/9999001/",
"source": "linkedin", "location": "", "is_remote": 0,
"salary": "", "description": "", "date_found": "2026-02-24",
})
conn_mock = MagicMock()
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_ALERT_EMAIL), \
patch("scripts.task_runner.submit_task") as mock_submit:
from scripts.imap_sync import _scan_unmatched_leads
new_leads = _scan_unmatched_leads(conn_mock, {"lookback_days": 90}, db_path, set())
# Only one new job (the duplicate was skipped)
assert new_leads == 1
assert mock_submit.call_count == 1
def test_scan_unmatched_leads_linkedin_alert_skips_llm_path(tmp_path):
"""After a LinkedIn alert email, the LLM extraction path is never reached."""
from unittest.mock import patch, MagicMock
from scripts.db import init_db
db_path = tmp_path / "test.db"
init_db(db_path)
conn_mock = MagicMock()
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_ALERT_EMAIL), \
patch("scripts.task_runner.submit_task"), \
patch("scripts.imap_sync.extract_lead_info") as mock_llm:
from scripts.imap_sync import _scan_unmatched_leads
_scan_unmatched_leads(conn_mock, {"lookback_days": 90}, db_path, set())
# LLM extraction must never be called for alert emails
mock_llm.assert_not_called()
# ── _has_rejection_or_ats_signal ──────────────────────────────────────────────
def test_rejection_phrase_at_body_boundary():
"""Rejection phrase at char 1501 is NOT caught — only first 1500 chars checked."""
from scripts.imap_sync import _has_rejection_or_ats_signal
# "unfortunately" appears just past the 1500-char window
padding = "x " * 750 # 1500 chars
body = padding + "unfortunately we will not be moving forward"
assert _has_rejection_or_ats_signal("No subject match", body) is False
def test_rejection_phrase_within_body_limit():
"""Rejection phrase within first 1500 chars IS caught."""
from scripts.imap_sync import _has_rejection_or_ats_signal
body = "We regret to inform you that we will not be moving forward."
assert _has_rejection_or_ats_signal("Application Update", body) is True
def test_dont_forget_right_single_quote():
"""Right single quotation mark (\u2019) in 'don\u2019t forget' is blocked."""
from scripts.imap_sync import _has_rejection_or_ats_signal
body = "don\u2019t forget to complete your application"
assert _has_rejection_or_ats_signal("Reminder", body) is True
def test_dont_forget_left_single_quote():
"""Left single quotation mark (\u2018) in 'don\u2018t forget' is blocked."""
from scripts.imap_sync import _has_rejection_or_ats_signal
body = "don\u2018t forget to complete your application"
assert _has_rejection_or_ats_signal("Reminder", body) is True
def test_ats_subject_phrase_not_matched_in_body_only():
"""ATS confirm phrase in body alone does NOT trigger — subject-only check."""
from scripts.imap_sync import _has_rejection_or_ats_signal
# "thank you for applying" is an ATS subject phrase; must NOT be caught in body only
body = "Hi Alex, thank you for applying to our Senior TAM role. We'd love to chat."
assert _has_rejection_or_ats_signal("Interview Invitation", body) is False
def test_ats_subject_phrase_matched_in_subject():
"""ATS confirm phrase in subject triggers the filter."""
from scripts.imap_sync import _has_rejection_or_ats_signal
assert _has_rejection_or_ats_signal("Thank you for applying to Acme", "") is True
def test_spam_subject_prefix_at_sign():
"""Subject starting with '@' is blocked (Depop / social commerce pattern)."""
from scripts.imap_sync import _has_rejection_or_ats_signal
assert _has_rejection_or_ats_signal("@user sent you a special offer", "") is True
def test_rejection_uppercase_lowercased():
"""'UNFORTUNATELY' in body is downcased and caught correctly."""
from scripts.imap_sync import _has_rejection_or_ats_signal
assert _has_rejection_or_ats_signal("Update", "UNFORTUNATELY we have decided to go another direction.") is True
def test_rejection_phrase_in_quoted_thread_beyond_limit_not_blocked():
"""Rejection phrase beyond 1500-char body window does not block the email."""
from scripts.imap_sync import _has_rejection_or_ats_signal
clean_intro = "Hi Alex, we'd love to schedule a call with you. " * 30 # ~1500 chars
quoted_footer = "\n\nOn Mon, Jan 1 wrote:\n> Unfortunately we went with another candidate."
body = clean_intro + quoted_footer
# The phrase lands after the 1500-char cutoff — should NOT be blocked
assert _has_rejection_or_ats_signal("Interview Invitation", body) is False
# ── _quote_folder ─────────────────────────────────────────────────────────────
def test_quote_folder_with_spaces():
from scripts.imap_sync import _quote_folder
assert _quote_folder("TO DO JOBS") == '"TO DO JOBS"'
def test_quote_folder_no_spaces():
from scripts.imap_sync import _quote_folder
assert _quote_folder("INBOX") == "INBOX"
def test_quote_folder_internal_double_quotes():
from scripts.imap_sync import _quote_folder
assert _quote_folder('My "Jobs"') == '"My \\"Jobs\\""'
# ── _search_folder ────────────────────────────────────────────────────────────
def test_search_folder_nonexistent_returns_empty():
"""_search_folder returns [] when folder SELECT raises (folder doesn't exist)."""
from scripts.imap_sync import _search_folder
conn = MagicMock()
conn.select.side_effect = Exception("NO folder not found")
result = _search_folder(conn, "DOES_NOT_EXIST", "ALL", "01-Jan-2026")
assert result == []
def test_search_folder_special_gmail_name():
"""[Gmail]/All Mail folder name is quoted because it contains a space."""
from scripts.imap_sync import _search_folder
conn = MagicMock()
conn.select.return_value = ("OK", [b"1"])
conn.search.return_value = ("OK", [b"1 2"])
result = _search_folder(conn, "[Gmail]/All Mail", "ALL", "01-Jan-2026")
# Should not raise; select should be called with the quoted form
conn.select.assert_called_once_with('"[Gmail]/All Mail"', readonly=True)
assert result == [b"1", b"2"]
# ── _get_existing_message_ids ─────────────────────────────────────────────────
def test_get_existing_message_ids_excludes_null(tmp_path):
"""NULL message_id rows are excluded from the returned set."""
import sqlite3
from scripts.db import init_db, insert_job, add_contact
from scripts.imap_sync import _get_existing_message_ids
db_path = tmp_path / "test.db"
init_db(db_path)
job_id = insert_job(db_path, {
"title": "CSM", "company": "Acme", "url": "https://acme.com/1",
"source": "test", "location": "", "is_remote": 0,
"salary": "", "description": "", "date_found": "2026-01-01",
})
# Insert contact with NULL message_id via raw SQL
conn = sqlite3.connect(db_path)
conn.execute(
"INSERT INTO job_contacts (job_id, direction, subject, from_addr, body, received_at) "
"VALUES (?, 'inbound', 'subj', 'f@x.com', 'body', '2026-01-01')",
(job_id,)
)
conn.commit()
conn.close()
ids = _get_existing_message_ids(job_id, db_path)
assert None not in ids
assert "" not in ids
def test_get_existing_message_ids_excludes_empty_string(tmp_path):
"""Empty-string message_id rows are excluded."""
import sqlite3
from scripts.db import init_db, insert_job
from scripts.imap_sync import _get_existing_message_ids
db_path = tmp_path / "test.db"
init_db(db_path)
job_id = insert_job(db_path, {
"title": "CSM", "company": "Acme", "url": "https://acme.com/2",
"source": "test", "location": "", "is_remote": 0,
"salary": "", "description": "", "date_found": "2026-01-01",
})
conn = sqlite3.connect(db_path)
conn.execute(
"INSERT INTO job_contacts (job_id, direction, subject, from_addr, body, received_at, message_id) "
"VALUES (?, 'inbound', 'subj', 'f@x.com', 'body', '2026-01-01', '')",
(job_id,)
)
conn.commit()
conn.close()
ids = _get_existing_message_ids(job_id, db_path)
assert "" not in ids
def test_get_existing_message_ids_no_contacts(tmp_path):
"""Job with no contacts returns an empty set."""
from scripts.db import init_db, insert_job
from scripts.imap_sync import _get_existing_message_ids
db_path = tmp_path / "test.db"
init_db(db_path)
job_id = insert_job(db_path, {
"title": "CSM", "company": "Acme", "url": "https://acme.com/3",
"source": "test", "location": "", "is_remote": 0,
"salary": "", "description": "", "date_found": "2026-01-01",
})
assert _get_existing_message_ids(job_id, db_path) == set()
# ── _parse_message ────────────────────────────────────────────────────────────
def test_parse_message_no_message_id_returns_none():
"""Email with no Message-ID header returns None."""
from scripts.imap_sync import _parse_message
raw = (
b"From: recruiter@acme.com\r\n"
b"Subject: Interview Invitation\r\n"
b"\r\n"
b"Hi Alex!"
)
conn = MagicMock()
conn.fetch.return_value = ("OK", [(b"1 (RFC822 {40})", raw)])
assert _parse_message(conn, b"1") is None
def test_parse_message_rfc2047_subject_decoded():
"""RFC2047-encoded subject is decoded correctly."""
from scripts.imap_sync import _parse_message
# "Interview" encoded as UTF-8 base64
raw = (
b"From: recruiter@acme.com\r\n"
b"Message-ID: <decode-test@acme.com>\r\n"
b"Subject: =?utf-8?b?SW50ZXJ2aWV3?=\r\n"
b"\r\n"
b"Let's schedule a call."
)
conn = MagicMock()
conn.fetch.return_value = ("OK", [(b"1 (RFC822 {100})", raw)])
result = _parse_message(conn, b"1")
assert result is not None
assert "Interview" in result["subject"]
# ── classify_stage_signal ─────────────────────────────────────────────────────
def test_classify_stage_signal_returns_neutral_on_no_label_match():
"""Returns 'neutral' when LLM output matches no known label."""
from scripts.imap_sync import classify_stage_signal
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
mock_router.complete.return_value = "I cannot determine the category."
result = classify_stage_signal("Generic update", "No clear signal here.")
assert result == "neutral"
# ── extract_lead_info ─────────────────────────────────────────────────────────
def test_extract_lead_info_returns_none_on_llm_error():
"""extract_lead_info returns (None, None) when LLM call raises."""
from scripts.imap_sync import extract_lead_info
with patch("scripts.imap_sync._CLASSIFIER_ROUTER") as mock_router:
mock_router.complete.side_effect = RuntimeError("timeout")
result = extract_lead_info("Senior TAM at Wiz", "Hi Alex…", "r@wiz.com")
assert result == (None, None)
# ── _scan_unmatched_leads — signal gating ─────────────────────────────────────
_PLAIN_RECRUIT_EMAIL = {
"message_id": "<recruit-001@acme.com>",
"from_addr": "recruiter@acme.com",
"to_addr": "alex@example.com",
"subject": "Interview Opportunity at Acme",
"body": "Hi Alex, we have an exciting opportunity for you.",
"date": "2026-02-25 10:00:00",
}
def test_scan_unmatched_leads_skips_when_signal_none(tmp_path):
"""When classify_stage_signal returns None, lead is not inserted."""
from scripts.db import init_db
from scripts.imap_sync import _scan_unmatched_leads
db_path = tmp_path / "test.db"
init_db(db_path)
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
patch("scripts.imap_sync.classify_stage_signal", return_value=None), \
patch("scripts.imap_sync.extract_lead_info") as mock_extract:
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
assert result == 0
mock_extract.assert_not_called()
def test_scan_unmatched_leads_skips_when_signal_rejected(tmp_path):
"""When signal is 'rejected', lead is not inserted."""
from scripts.db import init_db
from scripts.imap_sync import _scan_unmatched_leads
db_path = tmp_path / "test.db"
init_db(db_path)
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
patch("scripts.imap_sync.classify_stage_signal", return_value="rejected"), \
patch("scripts.imap_sync.extract_lead_info") as mock_extract:
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
assert result == 0
mock_extract.assert_not_called()
def test_scan_unmatched_leads_proceeds_when_signal_neutral(tmp_path):
"""When signal is 'neutral', LLM extraction is still attempted."""
from scripts.db import init_db
from scripts.imap_sync import _scan_unmatched_leads
db_path = tmp_path / "test.db"
init_db(db_path)
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"), \
patch("scripts.imap_sync.extract_lead_info", return_value=("Acme", "Senior TAM")), \
patch("scripts.task_runner.submit_task"):
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
assert result == 1
def test_scan_unmatched_leads_rejection_phrase_blocks_llm(tmp_path):
"""Email with rejection phrase in body is filtered before LLM is called."""
from scripts.db import init_db
from scripts.imap_sync import _scan_unmatched_leads
db_path = tmp_path / "test.db"
init_db(db_path)
rejection_email = {**_PLAIN_RECRUIT_EMAIL,
"body": "Unfortunately we have decided not to move forward."}
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=rejection_email), \
patch("scripts.imap_sync.classify_stage_signal") as mock_classify:
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
assert result == 0
mock_classify.assert_not_called()
def test_scan_unmatched_leads_genuine_lead_has_synthetic_url(tmp_path):
"""A genuine lead is inserted with a synthetic email:// URL."""
import sqlite3
from scripts.db import init_db
from scripts.imap_sync import _scan_unmatched_leads
db_path = tmp_path / "test.db"
init_db(db_path)
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
patch("scripts.imap_sync.classify_stage_signal", return_value="interview_scheduled"), \
patch("scripts.imap_sync.extract_lead_info", return_value=("Acme", "Senior TAM")), \
patch("scripts.task_runner.submit_task"):
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
assert result == 1
conn = sqlite3.connect(db_path)
row = conn.execute("SELECT url FROM jobs LIMIT 1").fetchone()
conn.close()
assert row[0].startswith("email://")
def test_scan_unmatched_leads_no_reinsert_on_second_run(tmp_path):
"""Same email not re-inserted on a second sync run (known_message_ids dedup)."""
from scripts.db import init_db
from scripts.imap_sync import _scan_unmatched_leads
db_path = tmp_path / "test.db"
init_db(db_path)
known = set()
shared_kwargs = dict(
conn=MagicMock(),
cfg={"lookback_days": 90},
db_path=db_path,
)
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"), \
patch("scripts.imap_sync.extract_lead_info", return_value=("Acme", "TAM")), \
patch("scripts.task_runner.submit_task"):
first = _scan_unmatched_leads(**shared_kwargs, known_message_ids=known)
second = _scan_unmatched_leads(**shared_kwargs, known_message_ids=known)
assert first == 1
assert second == 0
def test_scan_unmatched_leads_extract_none_no_insert(tmp_path):
"""When extract_lead_info returns (None, None), no job is inserted."""
import sqlite3
from scripts.db import init_db
from scripts.imap_sync import _scan_unmatched_leads
db_path = tmp_path / "test.db"
init_db(db_path)
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=_PLAIN_RECRUIT_EMAIL), \
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"), \
patch("scripts.imap_sync.extract_lead_info", return_value=(None, None)):
result = _scan_unmatched_leads(MagicMock(), {"lookback_days": 90}, db_path, set())
assert result == 0
conn = sqlite3.connect(db_path)
count = conn.execute("SELECT COUNT(*) FROM jobs").fetchone()[0]
conn.close()
assert count == 0
# ── _scan_todo_label ──────────────────────────────────────────────────────────
def _make_job(db_path, company="Acme", url="https://acme.com/job/1"):
from scripts.db import init_db, insert_job
init_db(db_path)
return insert_job(db_path, {
"title": "CSM", "company": company, "url": url,
"source": "test", "location": "", "is_remote": 0,
"salary": "", "description": "", "date_found": "2026-01-01",
})
def test_scan_todo_label_empty_string_returns_zero(tmp_path):
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
_make_job(db_path)
assert _scan_todo_label(MagicMock(), {"todo_label": ""}, db_path, [], set()) == 0
def test_scan_todo_label_missing_key_returns_zero(tmp_path):
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
_make_job(db_path)
assert _scan_todo_label(MagicMock(), {}, db_path, [], set()) == 0
def test_scan_todo_label_folder_not_found_returns_zero(tmp_path):
"""When folder doesn't exist on server, returns 0 without crashing."""
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
_make_job(db_path)
with patch("scripts.imap_sync._search_folder", return_value=[]):
result = _scan_todo_label(
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
db_path, [], set()
)
assert result == 0
def test_scan_todo_label_email_matches_company_and_keyword(tmp_path):
"""Email matching company name + TODO action keyword gets attached."""
from scripts.db import get_contacts
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
job_id = _make_job(db_path)
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
todo_email = {
"message_id": "<todo-001@acme.com>",
"from_addr": "recruiter@acme.com",
"to_addr": "alex@example.com",
"subject": "Interview scheduled with Acme",
"body": "Hi Alex, your interview is confirmed.",
"date": "2026-02-25 10:00:00",
}
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=todo_email), \
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"):
result = _scan_todo_label(
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
db_path, active_jobs, set()
)
assert result == 1
contacts = get_contacts(db_path, job_id=job_id)
assert len(contacts) == 1
assert contacts[0]["subject"] == "Interview scheduled with Acme"
def test_scan_todo_label_no_action_keyword_skipped(tmp_path):
"""Email with company match but no TODO keyword is skipped."""
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
job_id = _make_job(db_path)
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
no_keyword_email = {
"message_id": "<todo-002@acme.com>",
"from_addr": "noreply@acme.com",
"to_addr": "alex@example.com",
"subject": "Acme newsletter",
"body": "Company updates this week.",
"date": "2026-02-25 10:00:00",
}
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=no_keyword_email):
result = _scan_todo_label(
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
db_path, active_jobs, set()
)
assert result == 0
def test_scan_todo_label_no_company_match_skipped(tmp_path):
"""Email with no company name in from/subject/body[:300] is skipped."""
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
job_id = _make_job(db_path, company="Acme")
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
unrelated_email = {
"message_id": "<todo-003@other.com>",
"from_addr": "recruiter@other.com",
"to_addr": "alex@example.com",
"subject": "Interview scheduled with OtherCo",
"body": "Hi Alex, interview with OtherCo confirmed.",
"date": "2026-02-25 10:00:00",
}
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=unrelated_email):
result = _scan_todo_label(
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
db_path, active_jobs, set()
)
assert result == 0
def test_scan_todo_label_duplicate_message_id_not_reinserted(tmp_path):
"""Email already in known_message_ids is not re-attached."""
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
job_id = _make_job(db_path)
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
todo_email = {
"message_id": "<already-seen@acme.com>",
"from_addr": "recruiter@acme.com",
"to_addr": "alex@example.com",
"subject": "Interview scheduled with Acme",
"body": "Hi Alex.",
"date": "2026-02-25 10:00:00",
}
known = {"<already-seen@acme.com>"}
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=todo_email):
result = _scan_todo_label(
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
db_path, active_jobs, known
)
assert result == 0
def test_scan_todo_label_stage_signal_set_for_non_neutral(tmp_path):
"""Non-neutral classifier signal is written to the contact row."""
import sqlite3
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
job_id = _make_job(db_path)
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
todo_email = {
"message_id": "<signal-001@acme.com>",
"from_addr": "recruiter@acme.com",
"to_addr": "alex@example.com",
"subject": "Interview scheduled with Acme",
"body": "Your phone screen is confirmed.",
"date": "2026-02-25 10:00:00",
}
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=todo_email), \
patch("scripts.imap_sync.classify_stage_signal", return_value="interview_scheduled"):
_scan_todo_label(
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
db_path, active_jobs, set()
)
conn = sqlite3.connect(db_path)
row = conn.execute("SELECT stage_signal FROM job_contacts LIMIT 1").fetchone()
conn.close()
assert row[0] == "interview_scheduled"
def test_scan_todo_label_body_fallback_matches(tmp_path):
"""Company name only in body[:300] still triggers a match (body fallback)."""
from scripts.db import get_contacts
from scripts.imap_sync import _scan_todo_label
db_path = tmp_path / "test.db"
job_id = _make_job(db_path, company="Acme")
active_jobs = [{"id": job_id, "company": "Acme", "url": "https://acme.com/job/1"}]
# Company not in from_addr or subject — only in body
body_only_email = {
"message_id": "<body-fallback@noreply.greenhouse.io>",
"from_addr": "noreply@greenhouse.io",
"to_addr": "alex@example.com",
"subject": "Interview scheduled",
"body": "Your interview with Acme has been confirmed for tomorrow.",
"date": "2026-02-25 10:00:00",
}
with patch("scripts.imap_sync._search_folder", return_value=[b"1"]), \
patch("scripts.imap_sync._parse_message", return_value=body_only_email), \
patch("scripts.imap_sync.classify_stage_signal", return_value="neutral"):
result = _scan_todo_label(
MagicMock(), {"todo_label": "TO DO JOBS", "lookback_days": 90},
db_path, active_jobs, set()
)
assert result == 1
# ── sync_all ──────────────────────────────────────────────────────────────────
def test_sync_all_no_active_jobs_returns_full_dict(tmp_path):
"""With no active jobs, sync_all returns a dict with all 6 expected keys."""
from scripts.db import init_db
from scripts.imap_sync import sync_all
db_path = tmp_path / "test.db"
init_db(db_path)
with patch("scripts.imap_sync.load_config", return_value={}), \
patch("scripts.imap_sync.get_interview_jobs", return_value={}):
result = sync_all(db_path=db_path)
expected_keys = {"synced", "inbound", "outbound", "new_leads", "todo_attached", "errors"}
assert set(result.keys()) == expected_keys
assert result["todo_attached"] == 0
def test_sync_all_on_stage_callback_fires(tmp_path):
"""on_stage callback is called with expected stage labels."""
from scripts.db import init_db
from scripts.imap_sync import sync_all
db_path = tmp_path / "test.db"
init_db(db_path)
fake_job = {"id": 1, "company": "Acme", "url": "https://acme.com/1"}
stages = []
conn_mock = MagicMock()
conn_mock.logout.return_value = ("OK", [])
with patch("scripts.imap_sync.load_config", return_value={}), \
patch("scripts.imap_sync.get_interview_jobs", return_value={"applied": [fake_job]}), \
patch("scripts.imap_sync.connect", return_value=conn_mock), \
patch("scripts.imap_sync.sync_job_emails", return_value=(0, 0)), \
patch("scripts.db.get_all_message_ids", return_value=set()), \
patch("scripts.imap_sync._scan_todo_label", return_value=0), \
patch("scripts.imap_sync._scan_unmatched_leads", return_value=0):
sync_all(db_path=db_path, on_stage=stages.append)
assert "connecting" in stages
assert "scanning todo label" in stages
assert "scanning leads" in stages
def test_sync_all_per_job_exception_continues(tmp_path):
"""Exception for one job does not abort sync of remaining jobs."""
from scripts.db import init_db
from scripts.imap_sync import sync_all
db_path = tmp_path / "test.db"
init_db(db_path)
fake_jobs = [
{"id": 1, "company": "Co0", "url": "https://co0.com/1"},
{"id": 2, "company": "Co1", "url": "https://co1.com/1"},
]
conn_mock = MagicMock()
conn_mock.logout.return_value = ("OK", [])
call_count = {"n": 0}
def flaky_sync(job, *args, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
raise RuntimeError("IMAP timeout")
return (1, 0)
with patch("scripts.imap_sync.load_config", return_value={}), \
patch("scripts.imap_sync.get_interview_jobs", return_value={"applied": fake_jobs}), \
patch("scripts.imap_sync.connect", return_value=conn_mock), \
patch("scripts.imap_sync.sync_job_emails", side_effect=flaky_sync), \
patch("scripts.db.get_all_message_ids", return_value=set()), \
patch("scripts.imap_sync._scan_todo_label", return_value=0), \
patch("scripts.imap_sync._scan_unmatched_leads", return_value=0):
result = sync_all(db_path=db_path)
assert len(result["errors"]) == 1
assert result["synced"] == 1 # second job succeeded
# ── Performance / edge cases ──────────────────────────────────────────────────
def test_parse_message_large_body_truncated():
"""Body longer than 4000 chars is silently truncated to 4000."""
from scripts.imap_sync import _parse_message
big_body = ("x" * 10_000).encode()
raw = (
b"From: r@acme.com\r\nMessage-ID: <big@acme.com>\r\n"
b"Subject: Interview\r\n\r\n"
) + big_body
conn = MagicMock()
conn.fetch.return_value = ("OK", [(b"1 (RFC822)", raw)])
result = _parse_message(conn, b"1")
assert result is not None
assert len(result["body"]) <= 4000
def test_parse_message_binary_attachment_no_crash():
"""Email with binary attachment returns a valid dict without crashing."""
from scripts.imap_sync import _parse_message
import email as _email
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
msg = MIMEMultipart()
msg["From"] = "r@acme.com"
msg["Message-ID"] = "<attach@acme.com>"
msg["Subject"] = "Offer letter attached"
msg.attach(MIMEText("Please find the attached offer letter.", "plain"))
msg.attach(MIMEApplication(b"\x00\x01\x02\x03" * 100, Name="offer.pdf"))
conn = MagicMock()
conn.fetch.return_value = ("OK", [(b"1 (RFC822)", msg.as_bytes())])
result = _parse_message(conn, b"1")
assert result is not None
assert result["message_id"] == "<attach@acme.com>"
def test_parse_message_multiple_text_parts_takes_first():
"""Email with multiple text/plain MIME parts uses only the first."""
from scripts.imap_sync import _parse_message
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
msg = MIMEMultipart()
msg["From"] = "r@acme.com"
msg["Message-ID"] = "<multipart@acme.com>"
msg["Subject"] = "Interview"
msg.attach(MIMEText("First part — the real body.", "plain"))
msg.attach(MIMEText("Second part — should be ignored.", "plain"))
conn = MagicMock()
conn.fetch.return_value = ("OK", [(b"1 (RFC822)", msg.as_bytes())])
result = _parse_message(conn, b"1")
assert result is not None
assert "First part" in result["body"]
assert "Second part" not in result["body"]
def test_get_all_message_ids_performance(tmp_path):
"""get_all_message_ids with 1000 rows completes quickly (smoke test for scale)."""
import sqlite3
import time
from scripts.db import init_db, insert_job
from scripts.db import get_all_message_ids
db_path = tmp_path / "test.db"
init_db(db_path)
job_id = insert_job(db_path, {
"title": "CSM", "company": "Acme", "url": "https://acme.com/perf",
"source": "test", "location": "", "is_remote": 0,
"salary": "", "description": "", "date_found": "2026-01-01",
})
conn = sqlite3.connect(db_path)
conn.executemany(
"INSERT INTO job_contacts (job_id, direction, subject, from_addr, body, received_at, message_id) "
"VALUES (?, 'inbound', 'subj', 'f@x.com', 'body', '2026-01-01', ?)",
[(job_id, f"<mid-{i}@x.com>") for i in range(1000)]
)
conn.commit()
conn.close()
start = time.monotonic()
ids = get_all_message_ids(db_path)
elapsed = time.monotonic() - start
assert len(ids) == 1000
assert elapsed < 1.0