From ab764cb8f6423f10098a049680f15fdf81f6a2f3 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 27 Feb 2026 15:15:49 -0800 Subject: [PATCH] =?UTF-8?q?feat:=20targeted=20fetch=20=E2=80=94=20date=20r?= =?UTF-8?q?ange=20+=20sender/subject=20filter=20for=20historical=20email?= =?UTF-8?q?=20pulls?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/label_tool.py | 161 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) diff --git a/app/label_tool.py b/app/label_tool.py index 30f2fa9..e534438 100644 --- a/app/label_tool.py +++ b/app/label_tool.py @@ -220,6 +220,81 @@ def _fetch_account(cfg: dict, days: int, limit: int, known_keys: set[str], return emails +def _fetch_targeted( + cfg: dict, + since_dt: datetime, before_dt: datetime, + term: str, field: str, + limit: int, + known_keys: set[str], + progress_cb=None, +) -> list[dict]: + """Fetch emails within a date range, optionally filtered by sender/subject. + + field: "from" | "subject" | "either" | "none" + """ + since = since_dt.strftime("%d-%b-%Y") + before = before_dt.strftime("%d-%b-%Y") + host = cfg.get("host", "imap.gmail.com") + port = int(cfg.get("port", 993)) + use_ssl = cfg.get("use_ssl", True) + username = cfg["username"] + password = cfg["password"] + name = cfg.get("name", username) + + conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port) + conn.login(username, password) + conn.select("INBOX", readonly=True) + + date_part = f'SINCE "{since}" BEFORE "{before}"' + if term and field == "from": + search_str = f'(FROM "{term}") {date_part}' + elif term and field == "subject": + search_str = f'(SUBJECT "{term}") {date_part}' + elif term and field == "either": + search_str = f'(OR (FROM "{term}") (SUBJECT "{term}")) {date_part}' + else: + search_str = date_part + + try: + _, data = conn.search(None, search_str) + uids = (data[0] or b"").split() + except Exception: + uids = [] + + emails: list[dict] = [] + for i, uid in enumerate(uids): + if len(emails) >= limit: + break + if progress_cb: + progress_cb(i / max(len(uids), 1), f"{name}: {len(emails)} fetched…") + try: + _, raw_data = conn.fetch(uid, "(RFC822)") + if not raw_data or not raw_data[0]: + continue + msg = _email_lib.message_from_bytes(raw_data[0][1]) + subj = _decode_str(msg.get("Subject", "")) + from_addr = _decode_str(msg.get("From", "")) + date = _decode_str(msg.get("Date", "")) + body = _extract_body(msg)[:800] + entry = { + "subject": subj, "body": body, + "from_addr": from_addr, "date": date, + "account": name, + } + key = _entry_key(entry) + if key not in known_keys: + known_keys.add(key) + emails.append(entry) + except Exception: + pass + + try: + conn.logout() + except Exception: + pass + return emails + + # ── Queue / score file helpers ─────────────────────────────────────────────── def _entry_key(e: dict) -> str: @@ -429,6 +504,92 @@ with tab_fetch: else: status.update(label="No new emails found (all already in queue or score file)", state="complete") + # ── Targeted fetch ─────────────────────────────────────────────────────── + st.divider() + with st.expander("🎯 Targeted Fetch — date range + keyword"): + st.caption( + "Pull emails within a specific date window, optionally filtered by " + "sender or subject. Use this to retrieve historical hiring threads." + ) + + _t1, _t2 = st.columns(2) + _one_year_ago = (datetime.now() - timedelta(days=365)).date() + t_since = _t1.date_input("From date", value=_one_year_ago, key="t_since") + t_before = _t2.date_input("To date", value=datetime.now().date(), key="t_before") + + t_term = st.text_input( + "Filter by keyword (optional)", + placeholder="e.g. Stateside", + key="t_term", + ) + _tf1, _tf2 = st.columns(2) + t_field_label = _tf1.selectbox( + "Search in", + ["Either (from or subject)", "Sender/from", "Subject line"], + key="t_field", + ) + t_limit = _tf2.number_input("Max emails", min_value=10, max_value=1000, value=300, key="t_limit") + + t_accs = st.multiselect("Accounts", all_accs, default=all_accs, key="t_accs") + + _field_map = { + "Either (from or subject)": "either", + "Sender/from": "from", + "Subject line": "subject", + } + + _t_invalid = not accounts or not t_accs or t_since >= t_before + if st.button("🎯 Targeted Fetch", disabled=_t_invalid, type="primary", key="btn_targeted"): + _t_since_dt = datetime(t_since.year, t_since.month, t_since.day) + _t_before_dt = datetime(t_before.year, t_before.month, t_before.day) + _t_field = _field_map[t_field_label] + + existing_keys = {_entry_key(e) for e in st.session_state.queue} + existing_keys.update(st.session_state.labeled_keys) + + fetched_all: list[dict] = [] + status = st.status("Fetching…", expanded=True) + _live = status.empty() + + for acc in accounts: + name = acc.get("name", acc.get("username")) + if name not in t_accs: + continue + status.write(f"Connecting to **{name}**…") + try: + emails = _fetch_targeted( + acc, + since_dt=_t_since_dt, before_dt=_t_before_dt, + term=t_term.strip(), field=_t_field, + limit=int(t_limit), + known_keys=existing_keys, + progress_cb=lambda p, msg: _live.markdown(f"⏳ {msg}"), + ) + _live.empty() + fetched_all.extend(emails) + status.write(f"✓ {name}: {len(emails)} new emails") + except Exception as e: + _live.empty() + status.write(f"✗ {name}: {e}") + + if fetched_all: + _save_jsonl(_QUEUE_FILE, st.session_state.queue + fetched_all) + st.session_state.queue = _load_jsonl(_QUEUE_FILE) + labeled_keys = st.session_state.labeled_keys + for i, entry in enumerate(st.session_state.queue): + if _entry_key(entry) not in labeled_keys: + st.session_state.idx = i + break + status.update( + label=f"Done — {len(fetched_all)} new emails added to queue", + state="complete", + ) + else: + status.update( + label="No new emails found in that date range", + state="complete", + ) + # ══════════════════════════════════════════════════════════════════════════════ # LABEL TAB