feat: extract IMAP logic to app/imap_fetch.py for reuse by API

This commit is contained in:
pyr0ball 2026-03-04 11:42:22 -08:00
parent 8ec2dfddee
commit 1d1f25641b
2 changed files with 300 additions and 0 deletions

214
app/imap_fetch.py Normal file
View file

@ -0,0 +1,214 @@
"""Avocet — IMAP fetch utilities.
Shared between app/api.py (FastAPI SSE endpoint) and app/label_tool.py (Streamlit).
No Streamlit imports here stdlib + imaplib only.
"""
from __future__ import annotations
import email as _email_lib
import hashlib
import imaplib
import re
from datetime import datetime, timedelta
from email.header import decode_header as _raw_decode
from html.parser import HTMLParser
from typing import Any, Iterator
# ── HTML → plain text ────────────────────────────────────────────────────────
class _TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self._parts: list[str] = []
def handle_data(self, data: str) -> None:
stripped = data.strip()
if stripped:
self._parts.append(stripped)
def get_text(self) -> str:
return " ".join(self._parts)
def strip_html(html_str: str) -> str:
try:
ex = _TextExtractor()
ex.feed(html_str)
return ex.get_text()
except Exception:
return re.sub(r"<[^>]+>", " ", html_str).strip()
# ── IMAP decode helpers ───────────────────────────────────────────────────────
def _decode_str(value: str | None) -> str:
if not value:
return ""
parts = _raw_decode(value)
out = []
for part, enc in parts:
if isinstance(part, bytes):
out.append(part.decode(enc or "utf-8", errors="replace"))
else:
out.append(str(part))
return " ".join(out).strip()
def _extract_body(msg: Any) -> str:
if msg.is_multipart():
html_fallback: str | None = None
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
try:
charset = part.get_content_charset() or "utf-8"
return part.get_payload(decode=True).decode(charset, errors="replace")
except Exception:
pass
elif ct == "text/html" and html_fallback is None:
try:
charset = part.get_content_charset() or "utf-8"
raw = part.get_payload(decode=True).decode(charset, errors="replace")
html_fallback = strip_html(raw)
except Exception:
pass
return html_fallback or ""
else:
try:
charset = msg.get_content_charset() or "utf-8"
raw = msg.get_payload(decode=True).decode(charset, errors="replace")
if msg.get_content_type() == "text/html":
return strip_html(raw)
return raw
except Exception:
pass
return ""
def entry_key(e: dict) -> str:
"""Stable MD5 content-hash for dedup — matches label_tool.py _entry_key."""
key = (e.get("subject", "") + (e.get("body", "") or "")[:100])
return hashlib.md5(key.encode("utf-8", errors="replace")).hexdigest()
# ── Wide search terms ────────────────────────────────────────────────────────
_WIDE_TERMS = [
"interview", "phone screen", "video call", "zoom link", "schedule a call",
"offer letter", "job offer", "offer of employment", "pleased to offer",
"unfortunately", "not moving forward", "other candidates", "regret to inform",
"no longer", "decided not to", "decided to go with",
"opportunity", "interested in your background", "reached out", "great fit",
"exciting role", "love to connect",
"assessment", "questionnaire", "culture fit", "culture-fit", "online assessment",
"application received", "thank you for applying", "application confirmation",
"you applied", "your application for",
"reschedule", "rescheduled", "new time", "moved to", "postponed", "new date",
"job digest", "jobs you may like", "recommended jobs", "jobs for you",
"new jobs", "job alert",
"came across your profile", "reaching out about", "great fit for a role",
"exciting opportunity",
"welcome to the team", "start date", "onboarding", "first day", "we're excited to have you",
"application", "recruiter", "recruiting", "hiring", "candidate",
]
# ── Public API ────────────────────────────────────────────────────────────────
def test_connection(acc: dict) -> tuple[bool, str, int | None]:
"""Connect, login, select folder. Returns (ok, human_message, message_count|None)."""
host = acc.get("host", "")
port = int(acc.get("port", 993))
use_ssl = acc.get("use_ssl", True)
username = acc.get("username", "")
password = acc.get("password", "")
folder = acc.get("folder", "INBOX")
if not host or not username or not password:
return False, "Host, username, and password are all required.", None
try:
conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port)
conn.login(username, password)
_, data = conn.select(folder, readonly=True)
count_raw = data[0].decode() if data and data[0] else "0"
count = int(count_raw) if count_raw.isdigit() else 0
conn.logout()
return True, f"Connected — {count:,} message(s) in {folder}.", count
except Exception as exc:
return False, str(exc), None
def fetch_account_stream(
acc: dict,
days_back: int,
limit: int,
known_keys: set[str],
) -> Iterator[dict]:
"""Generator — yields progress dicts while fetching emails via IMAP.
Mutates `known_keys` in place for cross-account dedup within one fetch session.
Yields event dicts with "type" key:
{"type": "start", "account": str, "total_uids": int}
{"type": "progress", "account": str, "fetched": int, "total_uids": int}
{"type": "done", "account": str, "added": int, "skipped": int, "emails": list}
"""
name = acc.get("name", acc.get("username", "?"))
host = acc.get("host", "imap.gmail.com")
port = int(acc.get("port", 993))
use_ssl = acc.get("use_ssl", True)
username = acc["username"]
password = acc["password"]
folder = acc.get("folder", "INBOX")
since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port)
conn.login(username, password)
conn.select(folder, readonly=True)
seen_uids: dict[bytes, None] = {}
for term in _WIDE_TERMS:
try:
_, data = conn.search(None, f'(SUBJECT "{term}" SINCE "{since}")')
for uid in (data[0] or b"").split():
seen_uids[uid] = None
except Exception:
pass
uids = list(seen_uids.keys())[: limit * 3]
yield {"type": "start", "account": name, "total_uids": len(uids)}
emails: list[dict] = []
skipped = 0
for i, uid in enumerate(uids):
if len(emails) >= limit:
break
if i % 5 == 0:
yield {"type": "progress", "account": name, "fetched": len(emails), "total_uids": len(uids)}
try:
_, raw_data = conn.fetch(uid, "(RFC822)")
if not raw_data or not raw_data[0]:
continue
msg = _email_lib.message_from_bytes(raw_data[0][1])
subj = _decode_str(msg.get("Subject", ""))
from_addr = _decode_str(msg.get("From", ""))
date = _decode_str(msg.get("Date", ""))
body = _extract_body(msg)[:800]
entry = {"subject": subj, "body": body, "from_addr": from_addr,
"date": date, "account": name}
k = entry_key(entry)
if k not in known_keys:
known_keys.add(k)
emails.append(entry)
else:
skipped += 1
except Exception:
skipped += 1
try:
conn.logout()
except Exception:
pass
yield {"type": "done", "account": name, "added": len(emails), "skipped": skipped,
"emails": emails}

86
tests/test_imap_fetch.py Normal file
View file

@ -0,0 +1,86 @@
"""Tests for imap_fetch — IMAP calls mocked."""
from unittest.mock import MagicMock, patch
def test_test_connection_missing_fields():
from app.imap_fetch import test_connection
ok, msg, count = test_connection({"host": "", "username": "", "password": ""})
assert ok is False
assert "required" in msg.lower()
def test_test_connection_success():
from app.imap_fetch import test_connection
mock_conn = MagicMock()
mock_conn.select.return_value = ("OK", [b"42"])
with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
ok, msg, count = test_connection({
"host": "imap.example.com", "port": 993, "use_ssl": True,
"username": "u@example.com", "password": "secret", "folder": "INBOX",
})
assert ok is True
assert count == 42
assert "42" in msg
def test_test_connection_auth_failure():
from app.imap_fetch import test_connection
import imaplib
with patch("app.imap_fetch.imaplib.IMAP4_SSL", side_effect=imaplib.IMAP4.error("auth failed")):
ok, msg, count = test_connection({
"host": "imap.example.com", "port": 993, "use_ssl": True,
"username": "u@example.com", "password": "wrong", "folder": "INBOX",
})
assert ok is False
assert count is None
def test_fetch_account_stream_yields_start_done(tmp_path):
from app.imap_fetch import fetch_account_stream
mock_conn = MagicMock()
mock_conn.search.return_value = ("OK", [b"1 2"])
raw_msg = b"Subject: Test\r\nFrom: a@b.com\r\nDate: Mon, 1 Mar 2026 12:00:00 +0000\r\n\r\nHello"
mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)])
with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
events = list(fetch_account_stream(
acc={"host": "h", "port": 993, "use_ssl": True,
"username": "u", "password": "p", "folder": "INBOX", "name": "Test"},
days_back=30, limit=10, known_keys=set(),
))
types = [e["type"] for e in events]
assert "start" in types
assert "done" in types
def test_fetch_account_stream_deduplicates(tmp_path):
from app.imap_fetch import fetch_account_stream
raw_msg = b"Subject: Dupe\r\nFrom: a@b.com\r\nDate: Mon, 1 Mar 2026 12:00:00 +0000\r\n\r\nBody"
mock_conn = MagicMock()
mock_conn.search.return_value = ("OK", [b"1"])
mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)])
known = set()
with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
events1 = list(fetch_account_stream(
{"host": "h", "port": 993, "use_ssl": True, "username": "u",
"password": "p", "folder": "INBOX", "name": "T"},
30, 10, known,
))
done1 = next(e for e in events1 if e["type"] == "done")
with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
events2 = list(fetch_account_stream(
{"host": "h", "port": 993, "use_ssl": True, "username": "u",
"password": "p", "folder": "INBOX", "name": "T"},
30, 10, known,
))
done2 = next(e for e in events2 if e["type"] == "done")
assert done1["added"] == 1
assert done2["added"] == 0