diff --git a/app/api.py b/app/api.py index 15c6de9..5355628 100644 --- a/app/api.py +++ b/app/api.py @@ -284,6 +284,51 @@ def test_account(req: AccountTestRequest): return {"ok": ok, "message": message, "count": count} +from fastapi.responses import StreamingResponse + + +@app.get("/api/fetch/stream") +def fetch_stream( + accounts: str = Query(default=""), + days_back: int = Query(default=90, ge=1, le=365), + limit: int = Query(default=150, ge=1, le=1000), + mode: str = Query(default="wide"), +): + from app.imap_fetch import fetch_account_stream + + selected_names = {n.strip() for n in accounts.split(",") if n.strip()} + config = get_config() # reuse existing endpoint logic + selected = [a for a in config["accounts"] if a.get("name") in selected_names] + + def generate(): + known_keys = {_item_id(x) for x in _read_jsonl(_queue_file())} + total_added = 0 + + for acc in selected: + try: + batch_emails: list[dict] = [] + for event in fetch_account_stream(acc, days_back, limit, known_keys): + if event["type"] == "done": + batch_emails = event.pop("emails", []) + total_added += event["added"] + yield f"data: {json.dumps(event)}\n\n" + # Write new emails to queue after each account + if batch_emails: + existing = _read_jsonl(_queue_file()) + _write_jsonl(_queue_file(), existing + batch_emails) + except Exception as exc: + error_event = {"type": "error", "account": acc.get("name", "?"), + "message": str(exc)} + yield f"data: {json.dumps(error_event)}\n\n" + + queue_size = len(_read_jsonl(_queue_file())) + complete = {"type": "complete", "total_added": total_added, "queue_size": queue_size} + yield f"data: {json.dumps(complete)}\n\n" + + return StreamingResponse(generate(), media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) + + # Static SPA — MUST be last (catches all unmatched paths) _DIST = _ROOT / "web" / "dist" if _DIST.exists(): diff --git a/tests/test_api.py b/tests/test_api.py index c609530..22a8cd4 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -276,3 +276,52 @@ def test_account_test_success(client): data = r.json() assert data["ok"] is True assert data["count"] == 99 + + +# ── /api/fetch/stream (SSE) ────────────────────────────────────────────────── + +def _parse_sse(content: bytes) -> list[dict]: + """Parse SSE response body into list of event dicts.""" + events = [] + for line in content.decode().splitlines(): + if line.startswith("data: "): + events.append(json.loads(line[6:])) + return events + + +def test_fetch_stream_no_accounts_configured(client, config_dir): + """With no config, stream should immediately complete with 0 added.""" + r = client.get("/api/fetch/stream?accounts=NoSuchAccount&days_back=30&limit=10") + assert r.status_code == 200 + events = _parse_sse(r.content) + complete = next((e for e in events if e["type"] == "complete"), None) + assert complete is not None + assert complete["total_added"] == 0 + + +def test_fetch_stream_with_mock_imap(client, config_dir, data_dir): + """With one configured account, stream should yield start/done/complete events.""" + import yaml + from unittest.mock import MagicMock, patch + + # Write a config with one account + cfg = {"accounts": [{"name": "Mock", "host": "h", "port": 993, "use_ssl": True, + "username": "u", "password": "p", "folder": "INBOX", + "days_back": 30}], "max_per_account": 50} + (config_dir / "label_tool.yaml").write_text(yaml.dump(cfg)) + + raw_msg = (b"Subject: Interview\r\nFrom: a@b.com\r\n" + b"Date: Mon, 1 Mar 2026 12:00:00 +0000\r\n\r\nBody") + mock_conn = MagicMock() + mock_conn.search.return_value = ("OK", [b"1"]) + mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)]) + + with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn): + r = client.get("/api/fetch/stream?accounts=Mock&days_back=30&limit=50") + + assert r.status_code == 200 + events = _parse_sse(r.content) + types = [e["type"] for e in events] + assert "start" in types + assert "done" in types + assert "complete" in types