diff --git a/app/imap_fetch.py b/app/imap_fetch.py new file mode 100644 index 0000000..bcafb7b --- /dev/null +++ b/app/imap_fetch.py @@ -0,0 +1,214 @@ +"""Avocet — IMAP fetch utilities. + +Shared between app/api.py (FastAPI SSE endpoint) and app/label_tool.py (Streamlit). +No Streamlit imports here — stdlib + imaplib only. +""" +from __future__ import annotations + +import email as _email_lib +import hashlib +import imaplib +import re +from datetime import datetime, timedelta +from email.header import decode_header as _raw_decode +from html.parser import HTMLParser +from typing import Any, Iterator + + +# ── HTML → plain text ──────────────────────────────────────────────────────── + +class _TextExtractor(HTMLParser): + def __init__(self): + super().__init__() + self._parts: list[str] = [] + + def handle_data(self, data: str) -> None: + stripped = data.strip() + if stripped: + self._parts.append(stripped) + + def get_text(self) -> str: + return " ".join(self._parts) + + +def strip_html(html_str: str) -> str: + try: + ex = _TextExtractor() + ex.feed(html_str) + return ex.get_text() + except Exception: + return re.sub(r"<[^>]+>", " ", html_str).strip() + + +# ── IMAP decode helpers ─────────────────────────────────────────────────────── + +def _decode_str(value: str | None) -> str: + if not value: + return "" + parts = _raw_decode(value) + out = [] + for part, enc in parts: + if isinstance(part, bytes): + out.append(part.decode(enc or "utf-8", errors="replace")) + else: + out.append(str(part)) + return " ".join(out).strip() + + +def _extract_body(msg: Any) -> str: + if msg.is_multipart(): + html_fallback: str | None = None + for part in msg.walk(): + ct = part.get_content_type() + if ct == "text/plain": + try: + charset = part.get_content_charset() or "utf-8" + return part.get_payload(decode=True).decode(charset, errors="replace") + except Exception: + pass + elif ct == "text/html" and html_fallback is None: + try: + charset = part.get_content_charset() or "utf-8" + raw = part.get_payload(decode=True).decode(charset, errors="replace") + html_fallback = strip_html(raw) + except Exception: + pass + return html_fallback or "" + else: + try: + charset = msg.get_content_charset() or "utf-8" + raw = msg.get_payload(decode=True).decode(charset, errors="replace") + if msg.get_content_type() == "text/html": + return strip_html(raw) + return raw + except Exception: + pass + return "" + + +def entry_key(e: dict) -> str: + """Stable MD5 content-hash for dedup — matches label_tool.py _entry_key.""" + key = (e.get("subject", "") + (e.get("body", "") or "")[:100]) + return hashlib.md5(key.encode("utf-8", errors="replace")).hexdigest() + + +# ── Wide search terms ──────────────────────────────────────────────────────── + +_WIDE_TERMS = [ + "interview", "phone screen", "video call", "zoom link", "schedule a call", + "offer letter", "job offer", "offer of employment", "pleased to offer", + "unfortunately", "not moving forward", "other candidates", "regret to inform", + "no longer", "decided not to", "decided to go with", + "opportunity", "interested in your background", "reached out", "great fit", + "exciting role", "love to connect", + "assessment", "questionnaire", "culture fit", "culture-fit", "online assessment", + "application received", "thank you for applying", "application confirmation", + "you applied", "your application for", + "reschedule", "rescheduled", "new time", "moved to", "postponed", "new date", + "job digest", "jobs you may like", "recommended jobs", "jobs for you", + "new jobs", "job alert", + "came across your profile", "reaching out about", "great fit for a role", + "exciting opportunity", + "welcome to the team", "start date", "onboarding", "first day", "we're excited to have you", + "application", "recruiter", "recruiting", "hiring", "candidate", +] + + +# ── Public API ──────────────────────────────────────────────────────────────── + +def test_connection(acc: dict) -> tuple[bool, str, int | None]: + """Connect, login, select folder. Returns (ok, human_message, message_count|None).""" + host = acc.get("host", "") + port = int(acc.get("port", 993)) + use_ssl = acc.get("use_ssl", True) + username = acc.get("username", "") + password = acc.get("password", "") + folder = acc.get("folder", "INBOX") + if not host or not username or not password: + return False, "Host, username, and password are all required.", None + try: + conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port) + conn.login(username, password) + _, data = conn.select(folder, readonly=True) + count_raw = data[0].decode() if data and data[0] else "0" + count = int(count_raw) if count_raw.isdigit() else 0 + conn.logout() + return True, f"Connected — {count:,} message(s) in {folder}.", count + except Exception as exc: + return False, str(exc), None + + +def fetch_account_stream( + acc: dict, + days_back: int, + limit: int, + known_keys: set[str], +) -> Iterator[dict]: + """Generator — yields progress dicts while fetching emails via IMAP. + + Mutates `known_keys` in place for cross-account dedup within one fetch session. + + Yields event dicts with "type" key: + {"type": "start", "account": str, "total_uids": int} + {"type": "progress", "account": str, "fetched": int, "total_uids": int} + {"type": "done", "account": str, "added": int, "skipped": int, "emails": list} + """ + name = acc.get("name", acc.get("username", "?")) + host = acc.get("host", "imap.gmail.com") + port = int(acc.get("port", 993)) + use_ssl = acc.get("use_ssl", True) + username = acc["username"] + password = acc["password"] + folder = acc.get("folder", "INBOX") + since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y") + + conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port) + conn.login(username, password) + conn.select(folder, readonly=True) + + seen_uids: dict[bytes, None] = {} + for term in _WIDE_TERMS: + try: + _, data = conn.search(None, f'(SUBJECT "{term}" SINCE "{since}")') + for uid in (data[0] or b"").split(): + seen_uids[uid] = None + except Exception: + pass + + uids = list(seen_uids.keys())[: limit * 3] + yield {"type": "start", "account": name, "total_uids": len(uids)} + + emails: list[dict] = [] + skipped = 0 + for i, uid in enumerate(uids): + if len(emails) >= limit: + break + if i % 5 == 0: + yield {"type": "progress", "account": name, "fetched": len(emails), "total_uids": len(uids)} + try: + _, raw_data = conn.fetch(uid, "(RFC822)") + if not raw_data or not raw_data[0]: + continue + msg = _email_lib.message_from_bytes(raw_data[0][1]) + subj = _decode_str(msg.get("Subject", "")) + from_addr = _decode_str(msg.get("From", "")) + date = _decode_str(msg.get("Date", "")) + body = _extract_body(msg)[:800] + entry = {"subject": subj, "body": body, "from_addr": from_addr, + "date": date, "account": name} + k = entry_key(entry) + if k not in known_keys: + known_keys.add(k) + emails.append(entry) + else: + skipped += 1 + except Exception: + skipped += 1 + + try: + conn.logout() + except Exception: + pass + + yield {"type": "done", "account": name, "added": len(emails), "skipped": skipped, + "emails": emails} diff --git a/tests/test_imap_fetch.py b/tests/test_imap_fetch.py new file mode 100644 index 0000000..e33dac7 --- /dev/null +++ b/tests/test_imap_fetch.py @@ -0,0 +1,86 @@ +"""Tests for imap_fetch — IMAP calls mocked.""" +from unittest.mock import MagicMock, patch + + +def test_test_connection_missing_fields(): + from app.imap_fetch import test_connection + ok, msg, count = test_connection({"host": "", "username": "", "password": ""}) + assert ok is False + assert "required" in msg.lower() + + +def test_test_connection_success(): + from app.imap_fetch import test_connection + + mock_conn = MagicMock() + mock_conn.select.return_value = ("OK", [b"42"]) + + with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + ok, msg, count = test_connection({ + "host": "imap.example.com", "port": 993, "use_ssl": True, + "username": "u@example.com", "password": "secret", "folder": "INBOX", + }) + assert ok is True + assert count == 42 + assert "42" in msg + + +def test_test_connection_auth_failure(): + from app.imap_fetch import test_connection + import imaplib + + with patch("app.imap_fetch.imaplib.IMAP4_SSL", side_effect=imaplib.IMAP4.error("auth failed")): + ok, msg, count = test_connection({ + "host": "imap.example.com", "port": 993, "use_ssl": True, + "username": "u@example.com", "password": "wrong", "folder": "INBOX", + }) + assert ok is False + assert count is None + + +def test_fetch_account_stream_yields_start_done(tmp_path): + from app.imap_fetch import fetch_account_stream + + mock_conn = MagicMock() + mock_conn.search.return_value = ("OK", [b"1 2"]) + raw_msg = b"Subject: Test\r\nFrom: a@b.com\r\nDate: Mon, 1 Mar 2026 12:00:00 +0000\r\n\r\nHello" + mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)]) + + with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + events = list(fetch_account_stream( + acc={"host": "h", "port": 993, "use_ssl": True, + "username": "u", "password": "p", "folder": "INBOX", "name": "Test"}, + days_back=30, limit=10, known_keys=set(), + )) + + types = [e["type"] for e in events] + assert "start" in types + assert "done" in types + + +def test_fetch_account_stream_deduplicates(tmp_path): + from app.imap_fetch import fetch_account_stream + + raw_msg = b"Subject: Dupe\r\nFrom: a@b.com\r\nDate: Mon, 1 Mar 2026 12:00:00 +0000\r\n\r\nBody" + mock_conn = MagicMock() + mock_conn.search.return_value = ("OK", [b"1"]) + mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)]) + + known = set() + with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + events1 = list(fetch_account_stream( + {"host": "h", "port": 993, "use_ssl": True, "username": "u", + "password": "p", "folder": "INBOX", "name": "T"}, + 30, 10, known, + )) + done1 = next(e for e in events1 if e["type"] == "done") + + with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + events2 = list(fetch_account_stream( + {"host": "h", "port": 993, "use_ssl": True, "username": "u", + "password": "p", "folder": "INBOX", "name": "T"}, + 30, 10, known, + )) + done2 = next(e for e in events2 if e["type"] == "done") + assert done1["added"] == 1 + assert done2["added"] == 0