feat: add GET /api/fetch/stream SSE endpoint for real-time IMAP progress

This commit is contained in:
pyr0ball 2026-03-04 12:05:23 -08:00
parent 965362f5e3
commit f38c73db97
2 changed files with 94 additions and 0 deletions

View file

@ -284,6 +284,51 @@ def test_account(req: AccountTestRequest):
return {"ok": ok, "message": message, "count": count} return {"ok": ok, "message": message, "count": count}
from fastapi.responses import StreamingResponse
@app.get("/api/fetch/stream")
def fetch_stream(
accounts: str = Query(default=""),
days_back: int = Query(default=90, ge=1, le=365),
limit: int = Query(default=150, ge=1, le=1000),
mode: str = Query(default="wide"),
):
from app.imap_fetch import fetch_account_stream
selected_names = {n.strip() for n in accounts.split(",") if n.strip()}
config = get_config() # reuse existing endpoint logic
selected = [a for a in config["accounts"] if a.get("name") in selected_names]
def generate():
known_keys = {_item_id(x) for x in _read_jsonl(_queue_file())}
total_added = 0
for acc in selected:
try:
batch_emails: list[dict] = []
for event in fetch_account_stream(acc, days_back, limit, known_keys):
if event["type"] == "done":
batch_emails = event.pop("emails", [])
total_added += event["added"]
yield f"data: {json.dumps(event)}\n\n"
# Write new emails to queue after each account
if batch_emails:
existing = _read_jsonl(_queue_file())
_write_jsonl(_queue_file(), existing + batch_emails)
except Exception as exc:
error_event = {"type": "error", "account": acc.get("name", "?"),
"message": str(exc)}
yield f"data: {json.dumps(error_event)}\n\n"
queue_size = len(_read_jsonl(_queue_file()))
complete = {"type": "complete", "total_added": total_added, "queue_size": queue_size}
yield f"data: {json.dumps(complete)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# Static SPA — MUST be last (catches all unmatched paths) # Static SPA — MUST be last (catches all unmatched paths)
_DIST = _ROOT / "web" / "dist" _DIST = _ROOT / "web" / "dist"
if _DIST.exists(): if _DIST.exists():

View file

@ -276,3 +276,52 @@ def test_account_test_success(client):
data = r.json() data = r.json()
assert data["ok"] is True assert data["ok"] is True
assert data["count"] == 99 assert data["count"] == 99
# ── /api/fetch/stream (SSE) ──────────────────────────────────────────────────
def _parse_sse(content: bytes) -> list[dict]:
"""Parse SSE response body into list of event dicts."""
events = []
for line in content.decode().splitlines():
if line.startswith("data: "):
events.append(json.loads(line[6:]))
return events
def test_fetch_stream_no_accounts_configured(client, config_dir):
"""With no config, stream should immediately complete with 0 added."""
r = client.get("/api/fetch/stream?accounts=NoSuchAccount&days_back=30&limit=10")
assert r.status_code == 200
events = _parse_sse(r.content)
complete = next((e for e in events if e["type"] == "complete"), None)
assert complete is not None
assert complete["total_added"] == 0
def test_fetch_stream_with_mock_imap(client, config_dir, data_dir):
"""With one configured account, stream should yield start/done/complete events."""
import yaml
from unittest.mock import MagicMock, patch
# Write a config with one account
cfg = {"accounts": [{"name": "Mock", "host": "h", "port": 993, "use_ssl": True,
"username": "u", "password": "p", "folder": "INBOX",
"days_back": 30}], "max_per_account": 50}
(config_dir / "label_tool.yaml").write_text(yaml.dump(cfg))
raw_msg = (b"Subject: Interview\r\nFrom: a@b.com\r\n"
b"Date: Mon, 1 Mar 2026 12:00:00 +0000\r\n\r\nBody")
mock_conn = MagicMock()
mock_conn.search.return_value = ("OK", [b"1"])
mock_conn.fetch.return_value = ("OK", [(b"1 (RFC822 {N})", raw_msg)])
with patch("app.imap_fetch.imaplib.IMAP4_SSL", return_value=mock_conn):
r = client.get("/api/fetch/stream?accounts=Mock&days_back=30&limit=50")
assert r.status_code == 200
events = _parse_sse(r.content)
types = [e["type"] for e in events]
assert "start" in types
assert "done" in types
assert "complete" in types