feat: extract fetch routes and IMAP helpers into app/data/fetch.py

This commit is contained in:
pyr0ball 2026-05-01 21:57:31 -07:00
parent cbec776ef1
commit 2054866ff1
5 changed files with 354 additions and 212 deletions

View file

@ -156,16 +156,8 @@ app.include_router(imitate_router, prefix="/api/imitate")
from app.style import router as style_router
app.include_router(style_router, prefix="/api/style")
class AccountTestRequest(BaseModel):
account: dict
@app.post("/api/accounts/test")
def test_account(req: AccountTestRequest):
from app.imap_fetch import test_connection
ok, message, count = test_connection(req.account)
return {"ok": ok, "message": message, "count": count}
from app.data.fetch import router as fetch_router
app.include_router(fetch_router, prefix="/api")
from fastapi.responses import StreamingResponse
@ -385,48 +377,6 @@ def cancel_finetune():
return {"status": "cancelled"}
@app.get("/api/fetch/stream")
def fetch_stream(
accounts: str = Query(default=""),
days_back: int = Query(default=90, ge=1, le=365),
limit: int = Query(default=150, ge=1, le=1000),
mode: str = Query(default="wide"),
):
from app.imap_fetch import fetch_account_stream
from app.data.label import get_config
selected_names = {n.strip() for n in accounts.split(",") if n.strip()}
config = get_config() # reuse label module config logic
selected = [a for a in config["accounts"] if a.get("name") in selected_names]
def generate():
known_keys = {_item_id(x) for x in _read_jsonl(_queue_file())}
total_added = 0
for acc in selected:
try:
batch_emails: list[dict] = []
for event in fetch_account_stream(acc, days_back, limit, known_keys):
if event["type"] == "done":
batch_emails = event.pop("emails", [])
total_added += event["added"]
yield f"data: {json.dumps(event)}\n\n"
# Write new emails to queue after each account
if batch_emails:
existing = _read_jsonl(_queue_file())
_write_jsonl(_queue_file(), existing + batch_emails)
except Exception as exc:
error_event = {"type": "error", "account": acc.get("name", "?"),
"message": str(exc)}
yield f"data: {json.dumps(error_event)}\n\n"
queue_size = len(_read_jsonl(_queue_file()))
complete = {"type": "complete", "total_added": total_added, "queue_size": queue_size}
yield f"data: {json.dumps(complete)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# Static SPA — MUST be last (catches all unmatched paths)
_DIST = _ROOT / "web" / "dist"

243
app/data/fetch.py Normal file
View file

@ -0,0 +1,243 @@
"""Avocet -- IMAP fetch utilities and fetch API routes.
All IMAP helper functions (from app/imap_fetch.py) plus the
/api/accounts/test and /api/fetch/stream endpoints.
"""
from __future__ import annotations
import email as _email_lib
import hashlib
import imaplib
import json
import yaml
from datetime import datetime, timedelta
from email.header import decode_header as _raw_decode
from pathlib import Path
from typing import Iterator
from fastapi import APIRouter, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from app.utils import extract_body, read_jsonl, write_jsonl
_ROOT = Path(__file__).parent.parent.parent
_DATA_DIR: Path = _ROOT / "data"
_CONFIG_DIR: Path | None = None
router = APIRouter()
def set_data_dir(path: Path) -> None:
global _DATA_DIR
_DATA_DIR = path
def set_config_dir(path: Path | None) -> None:
global _CONFIG_DIR
_CONFIG_DIR = path
def _config_file() -> Path:
if _CONFIG_DIR is not None:
return _CONFIG_DIR / "label_tool.yaml"
return _ROOT / "config" / "label_tool.yaml"
def _queue_file() -> Path:
return _DATA_DIR / "email_label_queue.jsonl"
def _get_config_accounts() -> list[dict]:
f = _config_file()
if not f.exists():
return []
raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {}
return raw.get("accounts", [])
# ── IMAP decode helpers ───────────────────────────────────────────────────────
def _decode_str(value: str | None) -> str:
if not value:
return ""
parts = _raw_decode(value)
out = []
for part, enc in parts:
if isinstance(part, bytes):
out.append(part.decode(enc or "utf-8", errors="replace"))
else:
out.append(str(part))
return " ".join(out).strip()
def entry_key(e: dict) -> str:
"""Stable MD5 content-hash for dedup — matches label_tool.py _entry_key."""
key = (e.get("subject", "") + (e.get("body", "") or "")[:100])
return hashlib.md5(key.encode("utf-8", errors="replace")).hexdigest()
# ── Wide search terms ────────────────────────────────────────────────────────
_WIDE_TERMS = [
"interview", "phone screen", "video call", "zoom link", "schedule a call",
"offer letter", "job offer", "offer of employment", "pleased to offer",
"unfortunately", "not moving forward", "other candidates", "regret to inform",
"no longer", "decided not to", "decided to go with",
"opportunity", "interested in your background", "reached out", "great fit",
"exciting role", "love to connect",
"assessment", "questionnaire", "culture fit", "culture-fit", "online assessment",
"application received", "thank you for applying", "application confirmation",
"you applied", "your application for",
"reschedule", "rescheduled", "new time", "moved to", "postponed", "new date",
"job digest", "jobs you may like", "recommended jobs", "jobs for you",
"new jobs", "job alert",
"came across your profile", "reaching out about", "great fit for a role",
"exciting opportunity",
"welcome to the team", "start date", "onboarding", "first day", "we're excited to have you",
"application", "recruiter", "recruiting", "hiring", "candidate",
]
# ── Public API ────────────────────────────────────────────────────────────────
def test_connection(acc: dict) -> tuple[bool, str, int | None]:
"""Connect, login, select folder. Returns (ok, human_message, message_count|None)."""
host = acc.get("host", "")
port = int(acc.get("port", 993))
use_ssl = acc.get("use_ssl", True)
username = acc.get("username", "")
password = acc.get("password", "")
folder = acc.get("folder", "INBOX")
if not host or not username or not password:
return False, "Host, username, and password are all required.", None
try:
conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port)
conn.login(username, password)
_, data = conn.select(folder, readonly=True)
count_raw = data[0].decode() if data and data[0] else "0"
count = int(count_raw) if count_raw.isdigit() else 0
conn.logout()
return True, f"Connected — {count:,} message(s) in {folder}.", count
except Exception as exc:
return False, str(exc), None
def fetch_account_stream(
acc: dict,
days_back: int,
limit: int,
known_keys: set[str],
) -> Iterator[dict]:
"""Generator — yields progress dicts while fetching emails via IMAP.
Mutates `known_keys` in place for cross-account dedup within one fetch session.
Yields event dicts with "type" key:
{"type": "start", "account": str, "total_uids": int}
{"type": "progress", "account": str, "fetched": int, "total_uids": int}
{"type": "done", "account": str, "added": int, "skipped": int, "emails": list}
"""
name = acc.get("name", acc.get("username", "?"))
host = acc.get("host", "imap.gmail.com")
port = int(acc.get("port", 993))
use_ssl = acc.get("use_ssl", True)
username = acc["username"]
password = acc["password"]
folder = acc.get("folder", "INBOX")
since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port)
conn.login(username, password)
conn.select(folder, readonly=True)
seen_uids: dict[bytes, None] = {}
for term in _WIDE_TERMS:
try:
_, data = conn.search(None, f'(SUBJECT "{term}" SINCE "{since}")')
for uid in (data[0] or b"").split():
seen_uids[uid] = None
except Exception:
pass
uids = list(seen_uids.keys())[: limit * 3]
yield {"type": "start", "account": name, "total_uids": len(uids)}
emails: list[dict] = []
skipped = 0
for i, uid in enumerate(uids):
if len(emails) >= limit:
break
if i % 5 == 0:
yield {"type": "progress", "account": name, "fetched": len(emails), "total_uids": len(uids)}
try:
_, raw_data = conn.fetch(uid, "(RFC822)")
if not raw_data or not raw_data[0]:
continue
msg = _email_lib.message_from_bytes(raw_data[0][1])
subj = _decode_str(msg.get("Subject", ""))
from_addr = _decode_str(msg.get("From", ""))
date = _decode_str(msg.get("Date", ""))
body = extract_body(msg)[:800]
entry = {"subject": subj, "body": body, "from_addr": from_addr,
"date": date, "account": name}
k = entry_key(entry)
if k not in known_keys:
known_keys.add(k)
emails.append(entry)
else:
skipped += 1
except Exception:
skipped += 1
try:
conn.logout()
except Exception:
pass
yield {"type": "done", "account": name, "added": len(emails), "skipped": skipped,
"emails": emails}
class AccountTestRequest(BaseModel):
account: dict
@router.post("/accounts/test")
def test_account_route(req: AccountTestRequest) -> dict:
ok, message, count = test_connection(req.account)
return {"ok": ok, "message": message, "count": count}
@router.get("/fetch/stream")
def fetch_stream(
accounts: str = Query(default=""),
days_back: int = Query(default=90, ge=1, le=365),
limit: int = Query(default=150, ge=1, le=1000),
mode: str = Query(default="wide"),
) -> StreamingResponse:
selected_names = {n.strip() for n in accounts.split(",") if n.strip()}
all_accounts = _get_config_accounts()
selected = [a for a in all_accounts if a.get("name") in selected_names]
def generate():
known_keys = {entry_key(x) for x in read_jsonl(_queue_file())}
total_added = 0
for acc in selected:
try:
batch_emails: list[dict] = []
for event in fetch_account_stream(acc, days_back, limit, known_keys):
if event["type"] == "done":
batch_emails = event.pop("emails", [])
total_added += event["added"]
yield f"data: {json.dumps(event)}\n\n"
if batch_emails:
existing = read_jsonl(_queue_file())
write_jsonl(_queue_file(), existing + batch_emails)
except Exception as exc:
yield f"data: {json.dumps({'type': 'error', 'account': acc.get('name', '?'), 'message': str(exc)})}\n\n"
queue_size = len(read_jsonl(_queue_file()))
yield f"data: {json.dumps({'type': 'complete', 'total_added': total_added, 'queue_size': queue_size})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})

View file

@ -1,158 +1,9 @@
"""Avocet — IMAP fetch utilities.
Shared between app/api.py (FastAPI SSE endpoint) and the label UI.
No Streamlit imports here stdlib + imaplib only.
"""
from __future__ import annotations
import email as _email_lib
import hashlib
import imaplib
from datetime import datetime, timedelta
from email.header import decode_header as _raw_decode
from typing import Any, Iterator
from app.utils import extract_body, strip_html # noqa: F401 (strip_html re-exported for callers)
# ── IMAP decode helpers ───────────────────────────────────────────────────────
def _decode_str(value: str | None) -> str:
if not value:
return ""
parts = _raw_decode(value)
out = []
for part, enc in parts:
if isinstance(part, bytes):
out.append(part.decode(enc or "utf-8", errors="replace"))
else:
out.append(str(part))
return " ".join(out).strip()
def entry_key(e: dict) -> str:
"""Stable MD5 content-hash for dedup — matches label_tool.py _entry_key."""
key = (e.get("subject", "") + (e.get("body", "") or "")[:100])
return hashlib.md5(key.encode("utf-8", errors="replace")).hexdigest()
# ── Wide search terms ────────────────────────────────────────────────────────
_WIDE_TERMS = [
"interview", "phone screen", "video call", "zoom link", "schedule a call",
"offer letter", "job offer", "offer of employment", "pleased to offer",
"unfortunately", "not moving forward", "other candidates", "regret to inform",
"no longer", "decided not to", "decided to go with",
"opportunity", "interested in your background", "reached out", "great fit",
"exciting role", "love to connect",
"assessment", "questionnaire", "culture fit", "culture-fit", "online assessment",
"application received", "thank you for applying", "application confirmation",
"you applied", "your application for",
"reschedule", "rescheduled", "new time", "moved to", "postponed", "new date",
"job digest", "jobs you may like", "recommended jobs", "jobs for you",
"new jobs", "job alert",
"came across your profile", "reaching out about", "great fit for a role",
"exciting opportunity",
"welcome to the team", "start date", "onboarding", "first day", "we're excited to have you",
"application", "recruiter", "recruiting", "hiring", "candidate",
]
# ── Public API ────────────────────────────────────────────────────────────────
def test_connection(acc: dict) -> tuple[bool, str, int | None]:
"""Connect, login, select folder. Returns (ok, human_message, message_count|None)."""
host = acc.get("host", "")
port = int(acc.get("port", 993))
use_ssl = acc.get("use_ssl", True)
username = acc.get("username", "")
password = acc.get("password", "")
folder = acc.get("folder", "INBOX")
if not host or not username or not password:
return False, "Host, username, and password are all required.", None
try:
conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port)
conn.login(username, password)
_, data = conn.select(folder, readonly=True)
count_raw = data[0].decode() if data and data[0] else "0"
count = int(count_raw) if count_raw.isdigit() else 0
conn.logout()
return True, f"Connected — {count:,} message(s) in {folder}.", count
except Exception as exc:
return False, str(exc), None
def fetch_account_stream(
acc: dict,
days_back: int,
limit: int,
known_keys: set[str],
) -> Iterator[dict]:
"""Generator — yields progress dicts while fetching emails via IMAP.
Mutates `known_keys` in place for cross-account dedup within one fetch session.
Yields event dicts with "type" key:
{"type": "start", "account": str, "total_uids": int}
{"type": "progress", "account": str, "fetched": int, "total_uids": int}
{"type": "done", "account": str, "added": int, "skipped": int, "emails": list}
"""
name = acc.get("name", acc.get("username", "?"))
host = acc.get("host", "imap.gmail.com")
port = int(acc.get("port", 993))
use_ssl = acc.get("use_ssl", True)
username = acc["username"]
password = acc["password"]
folder = acc.get("folder", "INBOX")
since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port)
conn.login(username, password)
conn.select(folder, readonly=True)
seen_uids: dict[bytes, None] = {}
for term in _WIDE_TERMS:
try:
_, data = conn.search(None, f'(SUBJECT "{term}" SINCE "{since}")')
for uid in (data[0] or b"").split():
seen_uids[uid] = None
except Exception:
pass
uids = list(seen_uids.keys())[: limit * 3]
yield {"type": "start", "account": name, "total_uids": len(uids)}
emails: list[dict] = []
skipped = 0
for i, uid in enumerate(uids):
if len(emails) >= limit:
break
if i % 5 == 0:
yield {"type": "progress", "account": name, "fetched": len(emails), "total_uids": len(uids)}
try:
_, raw_data = conn.fetch(uid, "(RFC822)")
if not raw_data or not raw_data[0]:
continue
msg = _email_lib.message_from_bytes(raw_data[0][1])
subj = _decode_str(msg.get("Subject", ""))
from_addr = _decode_str(msg.get("From", ""))
date = _decode_str(msg.get("Date", ""))
body = extract_body(msg)[:800]
entry = {"subject": subj, "body": body, "from_addr": from_addr,
"date": date, "account": name}
k = entry_key(entry)
if k not in known_keys:
known_keys.add(k)
emails.append(entry)
else:
skipped += 1
except Exception:
skipped += 1
try:
conn.logout()
except Exception:
pass
yield {"type": "done", "account": name, "added": len(emails), "skipped": skipped,
"emails": emails}
"""Backward-compat shim -- logic moved to app/data/fetch.py."""
import imaplib # noqa: F401 -- re-exported so existing patch("app.imap_fetch.imaplib...") calls still work
from app.data.fetch import ( # noqa: F401
entry_key,
fetch_account_stream,
test_connection,
_decode_str,
_WIDE_TERMS,
)

View file

@ -8,10 +8,13 @@ from app import api as api_module # noqa: F401
def reset_globals(tmp_path):
from app import api
from app.data import label as label_module
from app.data import fetch as fetch_module
api.set_data_dir(tmp_path)
label_module.set_data_dir(tmp_path)
label_module.set_config_dir(tmp_path)
label_module.reset_last_action()
fetch_module.set_data_dir(tmp_path)
fetch_module.set_config_dir(tmp_path)
yield
label_module.reset_last_action()
@ -273,7 +276,7 @@ def test_account_test_success(client):
from unittest.mock import MagicMock, patch
mock_conn = MagicMock()
mock_conn.select.return_value = ("OK", [b"99"])
with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
with patch("app.data.fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
r = client.post("/api/accounts/test", json={"account": {
"host": "imap.example.com", "port": 993, "use_ssl": True,
"username": "u@example.com", "password": "pw", "folder": "INBOX",
@ -322,7 +325,7 @@ def test_fetch_stream_with_mock_imap(client, config_dir, data_dir):
mock_conn.search.return_value = ("OK", [b"1"])
mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)])
with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
with patch("app.data.fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
r = client.get("/api/fetch/stream?accounts=Mock&days_back=30&limit=50")
assert r.status_code == 200

95
tests/test_data_fetch.py Normal file
View file

@ -0,0 +1,95 @@
"""Tests for app/data/fetch.py"""
import json
import yaml
import pytest
from fastapi.testclient import TestClient
from unittest.mock import MagicMock, patch
@pytest.fixture(autouse=True)
def reset_globals(tmp_path):
from app.data import fetch as fetch_module
fetch_module.set_data_dir(tmp_path)
fetch_module.set_config_dir(tmp_path)
yield
@pytest.fixture
def client():
from app.api import app
return TestClient(app)
def _parse_sse(content: bytes) -> list[dict]:
events = []
for line in content.decode().splitlines():
if line.startswith("data: "):
events.append(json.loads(line[6:]))
return events
def test_account_test_missing_fields(client):
r = client.post("/api/accounts/test",
json={"account": {"host": "", "username": "", "password": ""}})
assert r.status_code == 200
data = r.json()
assert data["ok"] is False
assert "required" in data["message"].lower()
def test_account_test_success(client):
mock_conn = MagicMock()
mock_conn.select.return_value = ("OK", [b"99"])
with patch("app.data.fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
r = client.post("/api/accounts/test", json={"account": {
"host": "imap.example.com", "port": 993, "use_ssl": True,
"username": "u@example.com", "password": "pw", "folder": "INBOX",
}})
assert r.status_code == 200
data = r.json()
assert data["ok"] is True
assert data["count"] == 99
def test_fetch_stream_no_accounts_configured(client, tmp_path):
r = client.get("/api/fetch/stream?accounts=NoSuchAccount&days_back=30&limit=10")
assert r.status_code == 200
events = _parse_sse(r.content)
complete = next((e for e in events if e["type"] == "complete"), None)
assert complete is not None
assert complete["total_added"] == 0
def test_fetch_stream_with_mock_imap(client, tmp_path):
from app.data import fetch as fetch_module
fetch_module.set_config_dir(tmp_path)
cfg = {"accounts": [{"name": "Mock", "host": "h", "port": 993, "use_ssl": True,
"username": "u", "password": "p", "folder": "INBOX",
"days_back": 30}], "max_per_account": 50}
(tmp_path / "label_tool.yaml").write_text(yaml.dump(cfg))
raw_msg = (b"Subject: Interview\r\nFrom: a@b.com\r\n"
b"Date: Mon, 1 Mar 2026 12:00:00 +0000\r\n\r\nBody")
mock_conn = MagicMock()
mock_conn.search.return_value = ("OK", [b"1"])
mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)])
with patch("app.data.fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
r = client.get("/api/fetch/stream?accounts=Mock&days_back=30&limit=50")
assert r.status_code == 200
events = _parse_sse(r.content)
types = [e["type"] for e in events]
assert "start" in types
assert "done" in types
assert "complete" in types
def test_entry_key_deterministic():
from app.data.fetch import entry_key
e = {"subject": "Test", "body": "Hello world"}
assert entry_key(e) == entry_key(e)
def test_entry_key_differs_by_subject():
from app.data.fetch import entry_key
a = {"subject": "A", "body": "same body"}
b = {"subject": "B", "body": "same body"}
assert entry_key(a) != entry_key(b)