feat: targeted fetch — date range + sender/subject filter for historical email pulls

This commit is contained in:
pyr0ball 2026-02-27 15:15:49 -08:00
parent f97ef32100
commit ab764cb8f6

View file

@ -220,6 +220,81 @@ def _fetch_account(cfg: dict, days: int, limit: int, known_keys: set[str],
return emails return emails
def _fetch_targeted(
cfg: dict,
since_dt: datetime, before_dt: datetime,
term: str, field: str,
limit: int,
known_keys: set[str],
progress_cb=None,
) -> list[dict]:
"""Fetch emails within a date range, optionally filtered by sender/subject.
field: "from" | "subject" | "either" | "none"
"""
since = since_dt.strftime("%d-%b-%Y")
before = before_dt.strftime("%d-%b-%Y")
host = cfg.get("host", "imap.gmail.com")
port = int(cfg.get("port", 993))
use_ssl = cfg.get("use_ssl", True)
username = cfg["username"]
password = cfg["password"]
name = cfg.get("name", username)
conn = (imaplib.IMAP4_SSL if use_ssl else imaplib.IMAP4)(host, port)
conn.login(username, password)
conn.select("INBOX", readonly=True)
date_part = f'SINCE "{since}" BEFORE "{before}"'
if term and field == "from":
search_str = f'(FROM "{term}") {date_part}'
elif term and field == "subject":
search_str = f'(SUBJECT "{term}") {date_part}'
elif term and field == "either":
search_str = f'(OR (FROM "{term}") (SUBJECT "{term}")) {date_part}'
else:
search_str = date_part
try:
_, data = conn.search(None, search_str)
uids = (data[0] or b"").split()
except Exception:
uids = []
emails: list[dict] = []
for i, uid in enumerate(uids):
if len(emails) >= limit:
break
if progress_cb:
progress_cb(i / max(len(uids), 1), f"{name}: {len(emails)} fetched…")
try:
_, raw_data = conn.fetch(uid, "(RFC822)")
if not raw_data or not raw_data[0]:
continue
msg = _email_lib.message_from_bytes(raw_data[0][1])
subj = _decode_str(msg.get("Subject", ""))
from_addr = _decode_str(msg.get("From", ""))
date = _decode_str(msg.get("Date", ""))
body = _extract_body(msg)[:800]
entry = {
"subject": subj, "body": body,
"from_addr": from_addr, "date": date,
"account": name,
}
key = _entry_key(entry)
if key not in known_keys:
known_keys.add(key)
emails.append(entry)
except Exception:
pass
try:
conn.logout()
except Exception:
pass
return emails
# ── Queue / score file helpers ─────────────────────────────────────────────── # ── Queue / score file helpers ───────────────────────────────────────────────
def _entry_key(e: dict) -> str: def _entry_key(e: dict) -> str:
@ -429,6 +504,92 @@ with tab_fetch:
else: else:
status.update(label="No new emails found (all already in queue or score file)", state="complete") status.update(label="No new emails found (all already in queue or score file)", state="complete")
# ── Targeted fetch ───────────────────────────────────────────────────────
st.divider()
with st.expander("🎯 Targeted Fetch — date range + keyword"):
st.caption(
"Pull emails within a specific date window, optionally filtered by "
"sender or subject. Use this to retrieve historical hiring threads."
)
_t1, _t2 = st.columns(2)
_one_year_ago = (datetime.now() - timedelta(days=365)).date()
t_since = _t1.date_input("From date", value=_one_year_ago, key="t_since")
t_before = _t2.date_input("To date", value=datetime.now().date(), key="t_before")
t_term = st.text_input(
"Filter by keyword (optional)",
placeholder="e.g. Stateside",
key="t_term",
)
_tf1, _tf2 = st.columns(2)
t_field_label = _tf1.selectbox(
"Search in",
["Either (from or subject)", "Sender/from", "Subject line"],
key="t_field",
)
t_limit = _tf2.number_input("Max emails", min_value=10, max_value=1000, value=300, key="t_limit")
t_accs = st.multiselect("Accounts", all_accs, default=all_accs, key="t_accs")
_field_map = {
"Either (from or subject)": "either",
"Sender/from": "from",
"Subject line": "subject",
}
_t_invalid = not accounts or not t_accs or t_since >= t_before
if st.button("🎯 Targeted Fetch", disabled=_t_invalid, type="primary", key="btn_targeted"):
_t_since_dt = datetime(t_since.year, t_since.month, t_since.day)
_t_before_dt = datetime(t_before.year, t_before.month, t_before.day)
_t_field = _field_map[t_field_label]
existing_keys = {_entry_key(e) for e in st.session_state.queue}
existing_keys.update(st.session_state.labeled_keys)
fetched_all: list[dict] = []
status = st.status("Fetching…", expanded=True)
_live = status.empty()
for acc in accounts:
name = acc.get("name", acc.get("username"))
if name not in t_accs:
continue
status.write(f"Connecting to **{name}**…")
try:
emails = _fetch_targeted(
acc,
since_dt=_t_since_dt, before_dt=_t_before_dt,
term=t_term.strip(), field=_t_field,
limit=int(t_limit),
known_keys=existing_keys,
progress_cb=lambda p, msg: _live.markdown(f"{msg}"),
)
_live.empty()
fetched_all.extend(emails)
status.write(f"{name}: {len(emails)} new emails")
except Exception as e:
_live.empty()
status.write(f"{name}: {e}")
if fetched_all:
_save_jsonl(_QUEUE_FILE, st.session_state.queue + fetched_all)
st.session_state.queue = _load_jsonl(_QUEUE_FILE)
labeled_keys = st.session_state.labeled_keys
for i, entry in enumerate(st.session_state.queue):
if _entry_key(entry) not in labeled_keys:
st.session_state.idx = i
break
status.update(
label=f"Done — {len(fetched_all)} new emails added to queue",
state="complete",
)
else:
status.update(
label="No new emails found in that date range",
state="complete",
)
# ══════════════════════════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════════════════════════
# LABEL TAB # LABEL TAB