diff --git a/app/api.py b/app/api.py index c08c2e5..1ad4129 100644 --- a/app/api.py +++ b/app/api.py @@ -156,16 +156,8 @@ app.include_router(imitate_router, prefix="/api/imitate") from app.style import router as style_router app.include_router(style_router, prefix="/api/style") - -class AccountTestRequest(BaseModel): - account: dict - - -@app.post("/api/accounts/test") -def test_account(req: AccountTestRequest): - from app.imap_fetch import test_connection - ok, message, count = test_connection(req.account) - return {"ok": ok, "message": message, "count": count} +from app.data.fetch import router as fetch_router +app.include_router(fetch_router, prefix="/api") from fastapi.responses import StreamingResponse @@ -385,48 +377,6 @@ def cancel_finetune(): return {"status": "cancelled"} -@app.get("/api/fetch/stream") -def fetch_stream( - accounts: str = Query(default=""), - days_back: int = Query(default=90, ge=1, le=365), - limit: int = Query(default=150, ge=1, le=1000), - mode: str = Query(default="wide"), -): - from app.imap_fetch import fetch_account_stream - from app.data.label import get_config - - selected_names = {n.strip() for n in accounts.split(",") if n.strip()} - config = get_config() # reuse label module config logic - selected = [a for a in config["accounts"] if a.get("name") in selected_names] - - def generate(): - known_keys = {_item_id(x) for x in _read_jsonl(_queue_file())} - total_added = 0 - - for acc in selected: - try: - batch_emails: list[dict] = [] - for event in fetch_account_stream(acc, days_back, limit, known_keys): - if event["type"] == "done": - batch_emails = event.pop("emails", []) - total_added += event["added"] - yield f"data: {json.dumps(event)}\n\n" - # Write new emails to queue after each account - if batch_emails: - existing = _read_jsonl(_queue_file()) - _write_jsonl(_queue_file(), existing + batch_emails) - except Exception as exc: - error_event = {"type": "error", "account": acc.get("name", "?"), - "message": str(exc)} - yield f"data: {json.dumps(error_event)}\n\n" - - queue_size = len(_read_jsonl(_queue_file())) - complete = {"type": "complete", "total_added": total_added, "queue_size": queue_size} - yield f"data: {json.dumps(complete)}\n\n" - - return StreamingResponse(generate(), media_type="text/event-stream", - headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) - # Static SPA — MUST be last (catches all unmatched paths) _DIST = _ROOT / "web" / "dist" diff --git a/app/data/fetch.py b/app/data/fetch.py new file mode 100644 index 0000000..05a10c8 --- /dev/null +++ b/app/data/fetch.py @@ -0,0 +1,243 @@ +"""Avocet -- IMAP fetch utilities and fetch API routes. + +All IMAP helper functions (from app/imap_fetch.py) plus the +/api/accounts/test and /api/fetch/stream endpoints. +""" +from __future__ import annotations + +import email as _email_lib +import hashlib +import imaplib +import json +import yaml +from datetime import datetime, timedelta +from email.header import decode_header as _raw_decode +from pathlib import Path +from typing import Iterator + +from fastapi import APIRouter, Query +from fastapi.responses import StreamingResponse +from pydantic import BaseModel + +from app.utils import extract_body, read_jsonl, write_jsonl + +_ROOT = Path(__file__).parent.parent.parent +_DATA_DIR: Path = _ROOT / "data" +_CONFIG_DIR: Path | None = None + +router = APIRouter() + + +def set_data_dir(path: Path) -> None: + global _DATA_DIR + _DATA_DIR = path + + +def set_config_dir(path: Path | None) -> None: + global _CONFIG_DIR + _CONFIG_DIR = path + + +def _config_file() -> Path: + if _CONFIG_DIR is not None: + return _CONFIG_DIR / "label_tool.yaml" + return _ROOT / "config" / "label_tool.yaml" + + +def _queue_file() -> Path: + return _DATA_DIR / "email_label_queue.jsonl" + + +def _get_config_accounts() -> list[dict]: + f = _config_file() + if not f.exists(): + return [] + raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} + return raw.get("accounts", []) + + +# ── IMAP decode helpers ─────────────────────────────────────────────────────── + +def _decode_str(value: str | None) -> str: + if not value: + return "" + parts = _raw_decode(value) + out = [] + for part, enc in parts: + if isinstance(part, bytes): + out.append(part.decode(enc or "utf-8", errors="replace")) + else: + out.append(str(part)) + return " ".join(out).strip() + + +def entry_key(e: dict) -> str: + """Stable MD5 content-hash for dedup — matches label_tool.py _entry_key.""" + key = (e.get("subject", "") + (e.get("body", "") or "")[:100]) + return hashlib.md5(key.encode("utf-8", errors="replace")).hexdigest() + + +# ── Wide search terms ──────────────────────────────────────────────────────── + +_WIDE_TERMS = [ + "interview", "phone screen", "video call", "zoom link", "schedule a call", + "offer letter", "job offer", "offer of employment", "pleased to offer", + "unfortunately", "not moving forward", "other candidates", "regret to inform", + "no longer", "decided not to", "decided to go with", + "opportunity", "interested in your background", "reached out", "great fit", + "exciting role", "love to connect", + "assessment", "questionnaire", "culture fit", "culture-fit", "online assessment", + "application received", "thank you for applying", "application confirmation", + "you applied", "your application for", + "reschedule", "rescheduled", "new time", "moved to", "postponed", "new date", + "job digest", "jobs you may like", "recommended jobs", "jobs for you", + "new jobs", "job alert", + "came across your profile", "reaching out about", "great fit for a role", + "exciting opportunity", + "welcome to the team", "start date", "onboarding", "first day", "we're excited to have you", + "application", "recruiter", "recruiting", "hiring", "candidate", +] + + +# ── Public API ──────────────────────────────────────────────────────────────── + +def test_connection(acc: dict) -> tuple[bool, str, int | None]: + """Connect, login, select folder. Returns (ok, human_message, message_count|None).""" + host = acc.get("host", "") + port = int(acc.get("port", 993)) + use_ssl = acc.get("use_ssl", True) + username = acc.get("username", "") + password = acc.get("password", "") + folder = acc.get("folder", "INBOX") + if not host or not username or not password: + return False, "Host, username, and password are all required.", None + try: + conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port) + conn.login(username, password) + _, data = conn.select(folder, readonly=True) + count_raw = data[0].decode() if data and data[0] else "0" + count = int(count_raw) if count_raw.isdigit() else 0 + conn.logout() + return True, f"Connected — {count:,} message(s) in {folder}.", count + except Exception as exc: + return False, str(exc), None + + +def fetch_account_stream( + acc: dict, + days_back: int, + limit: int, + known_keys: set[str], +) -> Iterator[dict]: + """Generator — yields progress dicts while fetching emails via IMAP. + + Mutates `known_keys` in place for cross-account dedup within one fetch session. + + Yields event dicts with "type" key: + {"type": "start", "account": str, "total_uids": int} + {"type": "progress", "account": str, "fetched": int, "total_uids": int} + {"type": "done", "account": str, "added": int, "skipped": int, "emails": list} + """ + name = acc.get("name", acc.get("username", "?")) + host = acc.get("host", "imap.gmail.com") + port = int(acc.get("port", 993)) + use_ssl = acc.get("use_ssl", True) + username = acc["username"] + password = acc["password"] + folder = acc.get("folder", "INBOX") + since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y") + + conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port) + conn.login(username, password) + conn.select(folder, readonly=True) + + seen_uids: dict[bytes, None] = {} + for term in _WIDE_TERMS: + try: + _, data = conn.search(None, f'(SUBJECT "{term}" SINCE "{since}")') + for uid in (data[0] or b"").split(): + seen_uids[uid] = None + except Exception: + pass + + uids = list(seen_uids.keys())[: limit * 3] + yield {"type": "start", "account": name, "total_uids": len(uids)} + + emails: list[dict] = [] + skipped = 0 + for i, uid in enumerate(uids): + if len(emails) >= limit: + break + if i % 5 == 0: + yield {"type": "progress", "account": name, "fetched": len(emails), "total_uids": len(uids)} + try: + _, raw_data = conn.fetch(uid, "(RFC822)") + if not raw_data or not raw_data[0]: + continue + msg = _email_lib.message_from_bytes(raw_data[0][1]) + subj = _decode_str(msg.get("Subject", "")) + from_addr = _decode_str(msg.get("From", "")) + date = _decode_str(msg.get("Date", "")) + body = extract_body(msg)[:800] + entry = {"subject": subj, "body": body, "from_addr": from_addr, + "date": date, "account": name} + k = entry_key(entry) + if k not in known_keys: + known_keys.add(k) + emails.append(entry) + else: + skipped += 1 + except Exception: + skipped += 1 + + try: + conn.logout() + except Exception: + pass + + yield {"type": "done", "account": name, "added": len(emails), "skipped": skipped, + "emails": emails} + + +class AccountTestRequest(BaseModel): + account: dict + + +@router.post("/accounts/test") +def test_account_route(req: AccountTestRequest) -> dict: + ok, message, count = test_connection(req.account) + return {"ok": ok, "message": message, "count": count} + + +@router.get("/fetch/stream") +def fetch_stream( + accounts: str = Query(default=""), + days_back: int = Query(default=90, ge=1, le=365), + limit: int = Query(default=150, ge=1, le=1000), + mode: str = Query(default="wide"), +) -> StreamingResponse: + selected_names = {n.strip() for n in accounts.split(",") if n.strip()} + all_accounts = _get_config_accounts() + selected = [a for a in all_accounts if a.get("name") in selected_names] + + def generate(): + known_keys = {entry_key(x) for x in read_jsonl(_queue_file())} + total_added = 0 + for acc in selected: + try: + batch_emails: list[dict] = [] + for event in fetch_account_stream(acc, days_back, limit, known_keys): + if event["type"] == "done": + batch_emails = event.pop("emails", []) + total_added += event["added"] + yield f"data: {json.dumps(event)}\n\n" + if batch_emails: + existing = read_jsonl(_queue_file()) + write_jsonl(_queue_file(), existing + batch_emails) + except Exception as exc: + yield f"data: {json.dumps({'type': 'error', 'account': acc.get('name', '?'), 'message': str(exc)})}\n\n" + queue_size = len(read_jsonl(_queue_file())) + yield f"data: {json.dumps({'type': 'complete', 'total_added': total_added, 'queue_size': queue_size})}\n\n" + + return StreamingResponse(generate(), media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) diff --git a/app/imap_fetch.py b/app/imap_fetch.py index 1e15119..5ed755f 100644 --- a/app/imap_fetch.py +++ b/app/imap_fetch.py @@ -1,158 +1,9 @@ -"""Avocet — IMAP fetch utilities. - -Shared between app/api.py (FastAPI SSE endpoint) and the label UI. -No Streamlit imports here — stdlib + imaplib only. -""" -from __future__ import annotations - -import email as _email_lib -import hashlib -import imaplib -from datetime import datetime, timedelta -from email.header import decode_header as _raw_decode -from typing import Any, Iterator - -from app.utils import extract_body, strip_html # noqa: F401 (strip_html re-exported for callers) - - -# ── IMAP decode helpers ─────────────────────────────────────────────────────── - -def _decode_str(value: str | None) -> str: - if not value: - return "" - parts = _raw_decode(value) - out = [] - for part, enc in parts: - if isinstance(part, bytes): - out.append(part.decode(enc or "utf-8", errors="replace")) - else: - out.append(str(part)) - return " ".join(out).strip() - - -def entry_key(e: dict) -> str: - """Stable MD5 content-hash for dedup — matches label_tool.py _entry_key.""" - key = (e.get("subject", "") + (e.get("body", "") or "")[:100]) - return hashlib.md5(key.encode("utf-8", errors="replace")).hexdigest() - - -# ── Wide search terms ──────────────────────────────────────────────────────── - -_WIDE_TERMS = [ - "interview", "phone screen", "video call", "zoom link", "schedule a call", - "offer letter", "job offer", "offer of employment", "pleased to offer", - "unfortunately", "not moving forward", "other candidates", "regret to inform", - "no longer", "decided not to", "decided to go with", - "opportunity", "interested in your background", "reached out", "great fit", - "exciting role", "love to connect", - "assessment", "questionnaire", "culture fit", "culture-fit", "online assessment", - "application received", "thank you for applying", "application confirmation", - "you applied", "your application for", - "reschedule", "rescheduled", "new time", "moved to", "postponed", "new date", - "job digest", "jobs you may like", "recommended jobs", "jobs for you", - "new jobs", "job alert", - "came across your profile", "reaching out about", "great fit for a role", - "exciting opportunity", - "welcome to the team", "start date", "onboarding", "first day", "we're excited to have you", - "application", "recruiter", "recruiting", "hiring", "candidate", -] - - -# ── Public API ──────────────────────────────────────────────────────────────── - -def test_connection(acc: dict) -> tuple[bool, str, int | None]: - """Connect, login, select folder. Returns (ok, human_message, message_count|None).""" - host = acc.get("host", "") - port = int(acc.get("port", 993)) - use_ssl = acc.get("use_ssl", True) - username = acc.get("username", "") - password = acc.get("password", "") - folder = acc.get("folder", "INBOX") - if not host or not username or not password: - return False, "Host, username, and password are all required.", None - try: - conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port) - conn.login(username, password) - _, data = conn.select(folder, readonly=True) - count_raw = data[0].decode() if data and data[0] else "0" - count = int(count_raw) if count_raw.isdigit() else 0 - conn.logout() - return True, f"Connected — {count:,} message(s) in {folder}.", count - except Exception as exc: - return False, str(exc), None - - -def fetch_account_stream( - acc: dict, - days_back: int, - limit: int, - known_keys: set[str], -) -> Iterator[dict]: - """Generator — yields progress dicts while fetching emails via IMAP. - - Mutates `known_keys` in place for cross-account dedup within one fetch session. - - Yields event dicts with "type" key: - {"type": "start", "account": str, "total_uids": int} - {"type": "progress", "account": str, "fetched": int, "total_uids": int} - {"type": "done", "account": str, "added": int, "skipped": int, "emails": list} - """ - name = acc.get("name", acc.get("username", "?")) - host = acc.get("host", "imap.gmail.com") - port = int(acc.get("port", 993)) - use_ssl = acc.get("use_ssl", True) - username = acc["username"] - password = acc["password"] - folder = acc.get("folder", "INBOX") - since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y") - - conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port) - conn.login(username, password) - conn.select(folder, readonly=True) - - seen_uids: dict[bytes, None] = {} - for term in _WIDE_TERMS: - try: - _, data = conn.search(None, f'(SUBJECT "{term}" SINCE "{since}")') - for uid in (data[0] or b"").split(): - seen_uids[uid] = None - except Exception: - pass - - uids = list(seen_uids.keys())[: limit * 3] - yield {"type": "start", "account": name, "total_uids": len(uids)} - - emails: list[dict] = [] - skipped = 0 - for i, uid in enumerate(uids): - if len(emails) >= limit: - break - if i % 5 == 0: - yield {"type": "progress", "account": name, "fetched": len(emails), "total_uids": len(uids)} - try: - _, raw_data = conn.fetch(uid, "(RFC822)") - if not raw_data or not raw_data[0]: - continue - msg = _email_lib.message_from_bytes(raw_data[0][1]) - subj = _decode_str(msg.get("Subject", "")) - from_addr = _decode_str(msg.get("From", "")) - date = _decode_str(msg.get("Date", "")) - body = extract_body(msg)[:800] - entry = {"subject": subj, "body": body, "from_addr": from_addr, - "date": date, "account": name} - k = entry_key(entry) - if k not in known_keys: - known_keys.add(k) - emails.append(entry) - else: - skipped += 1 - except Exception: - skipped += 1 - - try: - conn.logout() - except Exception: - pass - - yield {"type": "done", "account": name, "added": len(emails), "skipped": skipped, - "emails": emails} +"""Backward-compat shim -- logic moved to app/data/fetch.py.""" +import imaplib # noqa: F401 -- re-exported so existing patch("app.imap_fetch.imaplib...") calls still work +from app.data.fetch import ( # noqa: F401 + entry_key, + fetch_account_stream, + test_connection, + _decode_str, + _WIDE_TERMS, +) diff --git a/tests/test_api.py b/tests/test_api.py index 9d5c928..b1ad948 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -8,10 +8,13 @@ from app import api as api_module # noqa: F401 def reset_globals(tmp_path): from app import api from app.data import label as label_module + from app.data import fetch as fetch_module api.set_data_dir(tmp_path) label_module.set_data_dir(tmp_path) label_module.set_config_dir(tmp_path) label_module.reset_last_action() + fetch_module.set_data_dir(tmp_path) + fetch_module.set_config_dir(tmp_path) yield label_module.reset_last_action() @@ -273,7 +276,7 @@ def test_account_test_success(client): from unittest.mock import MagicMock, patch mock_conn = MagicMock() mock_conn.select.return_value = ("OK", [b"99"]) - with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + with patch("app.data.fetch.imaplib.IMAP4_SSL", return_value=mock_conn): r = client.post("/api/accounts/test", json={"account": { "host": "imap.example.com", "port": 993, "use_ssl": True, "username": "u@example.com", "password": "pw", "folder": "INBOX", @@ -322,7 +325,7 @@ def test_fetch_stream_with_mock_imap(client, config_dir, data_dir): mock_conn.search.return_value = ("OK", [b"1"]) mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)]) - with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + with patch("app.data.fetch.imaplib.IMAP4_SSL", return_value=mock_conn): r = client.get("/api/fetch/stream?accounts=Mock&days_back=30&limit=50") assert r.status_code == 200 diff --git a/tests/test_data_fetch.py b/tests/test_data_fetch.py new file mode 100644 index 0000000..737a797 --- /dev/null +++ b/tests/test_data_fetch.py @@ -0,0 +1,95 @@ +"""Tests for app/data/fetch.py""" +import json +import yaml +import pytest +from fastapi.testclient import TestClient +from unittest.mock import MagicMock, patch + + +@pytest.fixture(autouse=True) +def reset_globals(tmp_path): + from app.data import fetch as fetch_module + fetch_module.set_data_dir(tmp_path) + fetch_module.set_config_dir(tmp_path) + yield + + +@pytest.fixture +def client(): + from app.api import app + return TestClient(app) + + +def _parse_sse(content: bytes) -> list[dict]: + events = [] + for line in content.decode().splitlines(): + if line.startswith("data: "): + events.append(json.loads(line[6:])) + return events + + +def test_account_test_missing_fields(client): + r = client.post("/api/accounts/test", + json={"account": {"host": "", "username": "", "password": ""}}) + assert r.status_code == 200 + data = r.json() + assert data["ok"] is False + assert "required" in data["message"].lower() + + +def test_account_test_success(client): + mock_conn = MagicMock() + mock_conn.select.return_value = ("OK", [b"99"]) + with patch("app.data.fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + r = client.post("/api/accounts/test", json={"account": { + "host": "imap.example.com", "port": 993, "use_ssl": True, + "username": "u@example.com", "password": "pw", "folder": "INBOX", + }}) + assert r.status_code == 200 + data = r.json() + assert data["ok"] is True + assert data["count"] == 99 + + +def test_fetch_stream_no_accounts_configured(client, tmp_path): + r = client.get("/api/fetch/stream?accounts=NoSuchAccount&days_back=30&limit=10") + assert r.status_code == 200 + events = _parse_sse(r.content) + complete = next((e for e in events if e["type"] == "complete"), None) + assert complete is not None + assert complete["total_added"] == 0 + + +def test_fetch_stream_with_mock_imap(client, tmp_path): + from app.data import fetch as fetch_module + fetch_module.set_config_dir(tmp_path) + cfg = {"accounts": [{"name": "Mock", "host": "h", "port": 993, "use_ssl": True, + "username": "u", "password": "p", "folder": "INBOX", + "days_back": 30}], "max_per_account": 50} + (tmp_path / "label_tool.yaml").write_text(yaml.dump(cfg)) + raw_msg = (b"Subject: Interview\r\nFrom: a@b.com\r\n" + b"Date: Mon, 1 Mar 2026 12:00:00 +0000\r\n\r\nBody") + mock_conn = MagicMock() + mock_conn.search.return_value = ("OK", [b"1"]) + mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)]) + with patch("app.data.fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + r = client.get("/api/fetch/stream?accounts=Mock&days_back=30&limit=50") + assert r.status_code == 200 + events = _parse_sse(r.content) + types = [e["type"] for e in events] + assert "start" in types + assert "done" in types + assert "complete" in types + + +def test_entry_key_deterministic(): + from app.data.fetch import entry_key + e = {"subject": "Test", "body": "Hello world"} + assert entry_key(e) == entry_key(e) + + +def test_entry_key_differs_by_subject(): + from app.data.fetch import entry_key + a = {"subject": "A", "body": "same body"} + b = {"subject": "B", "body": "same body"} + assert entry_key(a) != entry_key(b)