fix(avocet): strip HTML from email bodies — stdlib HTMLParser, no deps
This commit is contained in:
parent
47973aeba6
commit
682a958c28
2 changed files with 310 additions and 18 deletions
|
|
@ -14,6 +14,7 @@ from __future__ import annotations
|
||||||
import email as _email_lib
|
import email as _email_lib
|
||||||
import hashlib
|
import hashlib
|
||||||
import html as _html
|
import html as _html
|
||||||
|
from html.parser import HTMLParser
|
||||||
import imaplib
|
import imaplib
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
|
@ -23,6 +24,9 @@ from email.header import decode_header as _raw_decode
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
|
||||||
import streamlit as st
|
import streamlit as st
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
|
@ -43,8 +47,9 @@ LABELS = [
|
||||||
"survey_received",
|
"survey_received",
|
||||||
"neutral",
|
"neutral",
|
||||||
"event_rescheduled",
|
"event_rescheduled",
|
||||||
"unrelated",
|
|
||||||
"digest",
|
"digest",
|
||||||
|
"new_lead",
|
||||||
|
"hired",
|
||||||
]
|
]
|
||||||
|
|
||||||
_LABEL_META: dict[str, dict] = {
|
_LABEL_META: dict[str, dict] = {
|
||||||
|
|
@ -55,8 +60,9 @@ _LABEL_META: dict[str, dict] = {
|
||||||
"survey_received": {"emoji": "📋", "color": "#9C27B0", "key": "5"},
|
"survey_received": {"emoji": "📋", "color": "#9C27B0", "key": "5"},
|
||||||
"neutral": {"emoji": "⬜", "color": "#607D8B", "key": "6"},
|
"neutral": {"emoji": "⬜", "color": "#607D8B", "key": "6"},
|
||||||
"event_rescheduled": {"emoji": "🔄", "color": "#FF5722", "key": "7"},
|
"event_rescheduled": {"emoji": "🔄", "color": "#FF5722", "key": "7"},
|
||||||
"unrelated": {"emoji": "🗑️", "color": "#757575", "key": "8"},
|
"digest": {"emoji": "📰", "color": "#00BCD4", "key": "8"},
|
||||||
"digest": {"emoji": "📰", "color": "#00BCD4", "key": "9"},
|
"new_lead": {"emoji": "🤝", "color": "#009688", "key": "9"},
|
||||||
|
"hired": {"emoji": "🎊", "color": "#FFC107", "key": "h"},
|
||||||
}
|
}
|
||||||
|
|
||||||
# ── HTML sanitiser ───────────────────────────────────────────────────────────
|
# ── HTML sanitiser ───────────────────────────────────────────────────────────
|
||||||
|
|
@ -78,7 +84,50 @@ def _to_html(text: str, newlines_to_br: bool = False) -> str:
|
||||||
return escaped
|
return escaped
|
||||||
|
|
||||||
|
|
||||||
# ── Wide IMAP search terms (cast a net across all 9 categories) ─────────────
|
# ── HTML → plain-text extractor ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
class _TextExtractor(HTMLParser):
|
||||||
|
"""Extract visible text from an HTML email body, preserving line breaks."""
|
||||||
|
_BLOCK = {"p","div","br","li","tr","h1","h2","h3","h4","h5","h6","blockquote"}
|
||||||
|
_SKIP = {"script","style","head","noscript"}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(convert_charrefs=True)
|
||||||
|
self._parts: list[str] = []
|
||||||
|
self._depth_skip = 0
|
||||||
|
|
||||||
|
def handle_starttag(self, tag, attrs):
|
||||||
|
tag = tag.lower()
|
||||||
|
if tag in self._SKIP:
|
||||||
|
self._depth_skip += 1
|
||||||
|
elif tag in self._BLOCK:
|
||||||
|
self._parts.append("\n")
|
||||||
|
|
||||||
|
def handle_endtag(self, tag):
|
||||||
|
if tag.lower() in self._SKIP:
|
||||||
|
self._depth_skip = max(0, self._depth_skip - 1)
|
||||||
|
|
||||||
|
def handle_data(self, data):
|
||||||
|
if not self._depth_skip:
|
||||||
|
self._parts.append(data)
|
||||||
|
|
||||||
|
def get_text(self) -> str:
|
||||||
|
text = "".join(self._parts)
|
||||||
|
lines = [ln.strip() for ln in text.splitlines()]
|
||||||
|
return "\n".join(ln for ln in lines if ln)
|
||||||
|
|
||||||
|
|
||||||
|
def _strip_html(html_str: str) -> str:
|
||||||
|
"""Convert HTML email body to plain text. Pure stdlib, no dependencies."""
|
||||||
|
try:
|
||||||
|
extractor = _TextExtractor()
|
||||||
|
extractor.feed(html_str)
|
||||||
|
return extractor.get_text()
|
||||||
|
except Exception:
|
||||||
|
return re.sub(r"<[^>]+>", " ", html_str).strip()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Wide IMAP search terms (cast a net across all 10 categories) ────────────
|
||||||
_WIDE_TERMS = [
|
_WIDE_TERMS = [
|
||||||
# interview_scheduled
|
# interview_scheduled
|
||||||
"interview", "phone screen", "video call", "zoom link", "schedule a call",
|
"interview", "phone screen", "video call", "zoom link", "schedule a call",
|
||||||
|
|
@ -100,6 +149,11 @@ _WIDE_TERMS = [
|
||||||
# digest
|
# digest
|
||||||
"job digest", "jobs you may like", "recommended jobs", "jobs for you",
|
"job digest", "jobs you may like", "recommended jobs", "jobs for you",
|
||||||
"new jobs", "job alert",
|
"new jobs", "job alert",
|
||||||
|
# new_lead
|
||||||
|
"came across your profile", "reaching out about", "great fit for a role",
|
||||||
|
"exciting opportunity", "love to connect",
|
||||||
|
# hired / onboarding
|
||||||
|
"welcome to the team", "start date", "onboarding", "first day", "we're excited to have you",
|
||||||
# general recruitment
|
# general recruitment
|
||||||
"application", "recruiter", "recruiting", "hiring", "candidate",
|
"application", "recruiter", "recruiting", "hiring", "candidate",
|
||||||
]
|
]
|
||||||
|
|
@ -121,18 +175,32 @@ def _decode_str(value: str | None) -> str:
|
||||||
|
|
||||||
|
|
||||||
def _extract_body(msg: Any) -> str:
|
def _extract_body(msg: Any) -> str:
|
||||||
|
"""Return plain-text body. Strips HTML when no text/plain part exists."""
|
||||||
if msg.is_multipart():
|
if msg.is_multipart():
|
||||||
|
html_fallback: str | None = None
|
||||||
for part in msg.walk():
|
for part in msg.walk():
|
||||||
if part.get_content_type() == "text/plain":
|
ct = part.get_content_type()
|
||||||
|
if ct == "text/plain":
|
||||||
try:
|
try:
|
||||||
charset = part.get_content_charset() or "utf-8"
|
charset = part.get_content_charset() or "utf-8"
|
||||||
return part.get_payload(decode=True).decode(charset, errors="replace")
|
return part.get_payload(decode=True).decode(charset, errors="replace")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
elif ct == "text/html" and html_fallback is None:
|
||||||
|
try:
|
||||||
|
charset = part.get_content_charset() or "utf-8"
|
||||||
|
raw = part.get_payload(decode=True).decode(charset, errors="replace")
|
||||||
|
html_fallback = _strip_html(raw)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return html_fallback or ""
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
charset = msg.get_content_charset() or "utf-8"
|
charset = msg.get_content_charset() or "utf-8"
|
||||||
return msg.get_payload(decode=True).decode(charset, errors="replace")
|
raw = msg.get_payload(decode=True).decode(charset, errors="replace")
|
||||||
|
if msg.get_content_type() == "text/html":
|
||||||
|
return _strip_html(raw)
|
||||||
|
return raw
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return ""
|
return ""
|
||||||
|
|
@ -436,7 +504,9 @@ with st.sidebar:
|
||||||
|
|
||||||
# ── Tabs ─────────────────────────────────────────────────────────────────────
|
# ── Tabs ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
tab_label, tab_fetch, tab_stats, tab_settings = st.tabs(["🃏 Label", "📥 Fetch", "📊 Stats", "⚙️ Settings"])
|
tab_label, tab_fetch, tab_stats, tab_settings, tab_benchmark = st.tabs(
|
||||||
|
["🃏 Label", "📥 Fetch", "📊 Stats", "⚙️ Settings", "🔬 Benchmark"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ══════════════════════════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════════════════════════
|
||||||
|
|
@ -669,19 +739,19 @@ with tab_label:
|
||||||
_lbl_r = _r.get("label", "")
|
_lbl_r = _r.get("label", "")
|
||||||
_counts[_lbl_r] = _counts.get(_lbl_r, 0) + 1
|
_counts[_lbl_r] = _counts.get(_lbl_r, 0) + 1
|
||||||
|
|
||||||
row1_cols = st.columns(3)
|
row1_cols = st.columns(5)
|
||||||
row2_cols = st.columns(3)
|
row2_cols = st.columns(5)
|
||||||
row3_cols = st.columns(3)
|
|
||||||
bucket_pairs = [
|
bucket_pairs = [
|
||||||
(row1_cols[0], "interview_scheduled"),
|
(row1_cols[0], "interview_scheduled"),
|
||||||
(row1_cols[1], "offer_received"),
|
(row1_cols[1], "offer_received"),
|
||||||
(row1_cols[2], "rejected"),
|
(row1_cols[2], "rejected"),
|
||||||
(row2_cols[0], "positive_response"),
|
(row1_cols[3], "positive_response"),
|
||||||
(row2_cols[1], "survey_received"),
|
(row1_cols[4], "survey_received"),
|
||||||
(row2_cols[2], "neutral"),
|
(row2_cols[0], "neutral"),
|
||||||
(row3_cols[0], "event_rescheduled"),
|
(row2_cols[1], "event_rescheduled"),
|
||||||
(row3_cols[1], "unrelated"),
|
(row2_cols[2], "digest"),
|
||||||
(row3_cols[2], "digest"),
|
(row2_cols[3], "new_lead"),
|
||||||
|
(row2_cols[4], "hired"),
|
||||||
]
|
]
|
||||||
for col, lbl in bucket_pairs:
|
for col, lbl in bucket_pairs:
|
||||||
m = _LABEL_META[lbl]
|
m = _LABEL_META[lbl]
|
||||||
|
|
@ -720,7 +790,7 @@ with tab_label:
|
||||||
nav_cols = st.columns([2, 1, 1, 1])
|
nav_cols = st.columns([2, 1, 1, 1])
|
||||||
|
|
||||||
remaining = len(unlabeled) - 1
|
remaining = len(unlabeled) - 1
|
||||||
nav_cols[0].caption(f"**{remaining}** remaining · Keys: 1–9 = label, 0 = other, S = skip, U = undo")
|
nav_cols[0].caption(f"**{remaining}** remaining · Keys: 1–9, H = label, 0 = other, S = skip, U = undo")
|
||||||
|
|
||||||
if nav_cols[1].button("↩ Undo", disabled=not st.session_state.history, use_container_width=True):
|
if nav_cols[1].button("↩ Undo", disabled=not st.session_state.history, use_container_width=True):
|
||||||
prev_idx, prev_label = st.session_state.history.pop()
|
prev_idx, prev_label = st.session_state.history.pop()
|
||||||
|
|
@ -757,7 +827,7 @@ document.addEventListener('keydown', function(e) {
|
||||||
const keyToLabel = {
|
const keyToLabel = {
|
||||||
'1':'interview_scheduled','2':'offer_received','3':'rejected',
|
'1':'interview_scheduled','2':'offer_received','3':'rejected',
|
||||||
'4':'positive_response','5':'survey_received','6':'neutral',
|
'4':'positive_response','5':'survey_received','6':'neutral',
|
||||||
'7':'event_rescheduled','8':'unrelated','9':'digest'
|
'7':'event_rescheduled','8':'digest','9':'new_lead'
|
||||||
};
|
};
|
||||||
const label = keyToLabel[e.key];
|
const label = keyToLabel[e.key];
|
||||||
if (label) {
|
if (label) {
|
||||||
|
|
@ -772,6 +842,11 @@ document.addEventListener('keydown', function(e) {
|
||||||
for (const btn of btns) {
|
for (const btn of btns) {
|
||||||
if (btn.innerText.includes('Other')) { btn.click(); break; }
|
if (btn.innerText.includes('Other')) { btn.click(); break; }
|
||||||
}
|
}
|
||||||
|
} else if (e.key.toLowerCase() === 'h') {
|
||||||
|
const btns = window.parent.document.querySelectorAll('button');
|
||||||
|
for (const btn of btns) {
|
||||||
|
if (btn.innerText.toLowerCase().includes('hired')) { btn.click(); break; }
|
||||||
|
}
|
||||||
} else if (e.key.toLowerCase() === 's') {
|
} else if (e.key.toLowerCase() === 's') {
|
||||||
const btns = window.parent.document.querySelectorAll('button');
|
const btns = window.parent.document.querySelectorAll('button');
|
||||||
for (const btn of btns) {
|
for (const btn of btns) {
|
||||||
|
|
@ -979,3 +1054,133 @@ with tab_settings:
|
||||||
if _k in ("settings_accounts", "settings_max") or _k.startswith("s_"):
|
if _k in ("settings_accounts", "settings_max") or _k.startswith("s_"):
|
||||||
del st.session_state[_k]
|
del st.session_state[_k]
|
||||||
st.rerun()
|
st.rerun()
|
||||||
|
|
||||||
|
|
||||||
|
# ══════════════════════════════════════════════════════════════════════════════
|
||||||
|
# BENCHMARK TAB
|
||||||
|
# ══════════════════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
with tab_benchmark:
|
||||||
|
# ── Model selection ───────────────────────────────────────────────────────
|
||||||
|
_DEFAULT_MODELS = [
|
||||||
|
"deberta-zeroshot", "deberta-small", "gliclass-large",
|
||||||
|
"bart-mnli", "bge-m3-zeroshot", "deberta-small-2pass", "deberta-base-anli",
|
||||||
|
]
|
||||||
|
_SLOW_MODELS = [
|
||||||
|
"deberta-large-ling", "mdeberta-xnli-2m", "bge-reranker",
|
||||||
|
"deberta-xlarge", "mdeberta-mnli", "xlm-roberta-anli",
|
||||||
|
]
|
||||||
|
|
||||||
|
st.subheader("🔬 Benchmark Classifier Models")
|
||||||
|
|
||||||
|
_b_include_slow = st.checkbox("Include slow / large models", value=False, key="b_include_slow")
|
||||||
|
_b_all_models = _DEFAULT_MODELS + (_SLOW_MODELS if _b_include_slow else [])
|
||||||
|
_b_selected = st.multiselect(
|
||||||
|
"Models to run",
|
||||||
|
options=_b_all_models,
|
||||||
|
default=_b_all_models,
|
||||||
|
help="Uncheck models to skip them. Slow models require --include-slow.",
|
||||||
|
)
|
||||||
|
|
||||||
|
_n_examples = len(st.session_state.labeled)
|
||||||
|
st.caption(
|
||||||
|
f"Scoring against `{_SCORE_FILE.name}` · **{_n_examples} labeled examples**"
|
||||||
|
f" · Est. time: ~{max(1, len(_b_selected))} – {max(2, len(_b_selected) * 2)} min"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Direct binary avoids conda's output interception; -u = unbuffered stdout
|
||||||
|
_CLASSIFIER_PYTHON = "/devl/miniconda3/envs/job-seeker-classifiers/bin/python"
|
||||||
|
|
||||||
|
if st.button("▶ Run Benchmark", type="primary", disabled=not _b_selected, key="b_run"):
|
||||||
|
_b_cmd = [
|
||||||
|
_CLASSIFIER_PYTHON, "-u",
|
||||||
|
str(_ROOT / "scripts" / "benchmark_classifier.py"),
|
||||||
|
"--score", "--score-file", str(_SCORE_FILE),
|
||||||
|
"--models", *_b_selected,
|
||||||
|
]
|
||||||
|
with st.status("Running benchmark…", expanded=True) as _b_status:
|
||||||
|
_b_proc = subprocess.Popen(
|
||||||
|
_b_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||||
|
text=True, cwd=str(_ROOT),
|
||||||
|
env={**os.environ, "PYTHONUNBUFFERED": "1"},
|
||||||
|
)
|
||||||
|
_b_lines: list[str] = []
|
||||||
|
_b_area = st.empty()
|
||||||
|
for _b_line in _b_proc.stdout:
|
||||||
|
_b_lines.append(_b_line)
|
||||||
|
_b_area.code("".join(_b_lines[-30:]), language="text")
|
||||||
|
_b_proc.wait()
|
||||||
|
_b_full = "".join(_b_lines)
|
||||||
|
st.session_state["bench_output"] = _b_full
|
||||||
|
if _b_proc.returncode == 0:
|
||||||
|
_b_status.update(label="Benchmark complete ✓", state="complete", expanded=False)
|
||||||
|
else:
|
||||||
|
_b_status.update(label="Benchmark failed", state="error")
|
||||||
|
|
||||||
|
# ── Results display ───────────────────────────────────────────────────────
|
||||||
|
if "bench_output" in st.session_state:
|
||||||
|
_b_out = st.session_state["bench_output"]
|
||||||
|
|
||||||
|
# Parse summary table rows: name f1 accuracy ms
|
||||||
|
_b_rows = []
|
||||||
|
for _b_l in _b_out.splitlines():
|
||||||
|
_b_m = re.match(r"^([\w-]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s*$", _b_l.strip())
|
||||||
|
if _b_m:
|
||||||
|
_b_rows.append({
|
||||||
|
"Model": _b_m.group(1),
|
||||||
|
"macro-F1": float(_b_m.group(2)),
|
||||||
|
"Accuracy": float(_b_m.group(3)),
|
||||||
|
"ms/email": float(_b_m.group(4)),
|
||||||
|
})
|
||||||
|
|
||||||
|
if _b_rows:
|
||||||
|
import pandas as _pd
|
||||||
|
_b_df = _pd.DataFrame(_b_rows).sort_values("macro-F1", ascending=False).reset_index(drop=True)
|
||||||
|
st.dataframe(
|
||||||
|
_b_df,
|
||||||
|
column_config={
|
||||||
|
"macro-F1": st.column_config.ProgressColumn(
|
||||||
|
"macro-F1", min_value=0, max_value=1, format="%.3f",
|
||||||
|
),
|
||||||
|
"Accuracy": st.column_config.ProgressColumn(
|
||||||
|
"Accuracy", min_value=0, max_value=1, format="%.3f",
|
||||||
|
),
|
||||||
|
"ms/email": st.column_config.NumberColumn("ms/email", format="%.1f"),
|
||||||
|
},
|
||||||
|
use_container_width=True, hide_index=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
with st.expander("Full benchmark output"):
|
||||||
|
st.code(_b_out, language="text")
|
||||||
|
|
||||||
|
st.divider()
|
||||||
|
|
||||||
|
# ── Tests ─────────────────────────────────────────────────────────────────
|
||||||
|
st.subheader("🧪 Run Tests")
|
||||||
|
st.caption("Runs `pytest tests/ -v` in the job-seeker env (no model downloads required).")
|
||||||
|
|
||||||
|
if st.button("▶ Run Tests", key="b_run_tests"):
|
||||||
|
_t_cmd = [
|
||||||
|
"/devl/miniconda3/envs/job-seeker/bin/pytest", "tests/", "-v", "--tb=short",
|
||||||
|
]
|
||||||
|
with st.status("Running tests…", expanded=True) as _t_status:
|
||||||
|
_t_proc = subprocess.Popen(
|
||||||
|
_t_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||||
|
text=True, cwd=str(_ROOT),
|
||||||
|
)
|
||||||
|
_t_lines: list[str] = []
|
||||||
|
_t_area = st.empty()
|
||||||
|
for _t_line in _t_proc.stdout:
|
||||||
|
_t_lines.append(_t_line)
|
||||||
|
_t_area.code("".join(_t_lines[-30:]), language="text")
|
||||||
|
_t_proc.wait()
|
||||||
|
_t_full = "".join(_t_lines)
|
||||||
|
st.session_state["test_output"] = _t_full
|
||||||
|
_t_summary = [l for l in _t_lines if "passed" in l or "failed" in l or "error" in l.lower()]
|
||||||
|
_t_label = _t_summary[-1].strip() if _t_summary else "Done"
|
||||||
|
_t_state = "error" if _t_proc.returncode != 0 else "complete"
|
||||||
|
_t_status.update(label=_t_label, state=_t_state, expanded=False)
|
||||||
|
|
||||||
|
if "test_output" in st.session_state:
|
||||||
|
with st.expander("Full test output", expanded=True):
|
||||||
|
st.code(st.session_state["test_output"], language="text")
|
||||||
|
|
|
||||||
87
tests/test_label_tool.py
Normal file
87
tests/test_label_tool.py
Normal file
|
|
@ -0,0 +1,87 @@
|
||||||
|
"""Tests for label_tool HTML extraction utilities.
|
||||||
|
|
||||||
|
These functions are stdlib-only and safe to test without an IMAP connection.
|
||||||
|
"""
|
||||||
|
from email.mime.multipart import MIMEMultipart
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
|
||||||
|
from app.label_tool import _extract_body, _strip_html
|
||||||
|
|
||||||
|
|
||||||
|
# ── _strip_html ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_strip_html_removes_tags():
|
||||||
|
assert _strip_html("<p>Hello <b>world</b></p>") == "Hello world"
|
||||||
|
|
||||||
|
|
||||||
|
def test_strip_html_skips_script_content():
|
||||||
|
result = _strip_html("<script>doEvil()</script><p>real</p>")
|
||||||
|
assert "doEvil" not in result
|
||||||
|
assert "real" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_strip_html_skips_style_content():
|
||||||
|
result = _strip_html("<style>.foo{color:red}</style><p>visible</p>")
|
||||||
|
assert ".foo" not in result
|
||||||
|
assert "visible" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_strip_html_handles_br_as_newline():
|
||||||
|
result = _strip_html("line1<br>line2")
|
||||||
|
assert "line1" in result
|
||||||
|
assert "line2" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_strip_html_decodes_entities():
|
||||||
|
# convert_charrefs=True on HTMLParser handles & etc.
|
||||||
|
result = _strip_html("<p>Hello & welcome</p>")
|
||||||
|
assert "&" not in result
|
||||||
|
assert "Hello" in result
|
||||||
|
assert "welcome" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_strip_html_empty_string():
|
||||||
|
assert _strip_html("") == ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_strip_html_plain_text_passthrough():
|
||||||
|
assert _strip_html("no tags here") == "no tags here"
|
||||||
|
|
||||||
|
|
||||||
|
# ── _extract_body ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_extract_body_prefers_plain_over_html():
|
||||||
|
msg = MIMEMultipart("alternative")
|
||||||
|
msg.attach(MIMEText("plain body", "plain"))
|
||||||
|
msg.attach(MIMEText("<html><body>html body</body></html>", "html"))
|
||||||
|
assert _extract_body(msg) == "plain body"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_body_falls_back_to_html_when_no_plain():
|
||||||
|
msg = MIMEMultipart("alternative")
|
||||||
|
msg.attach(MIMEText("<html><body><p>HTML only email</p></body></html>", "html"))
|
||||||
|
result = _extract_body(msg)
|
||||||
|
assert "HTML only email" in result
|
||||||
|
assert "<" not in result # no raw HTML tags leaked through
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_body_non_multipart_html_stripped():
|
||||||
|
msg = MIMEText("<html><body><p>Solo HTML</p></body></html>", "html")
|
||||||
|
result = _extract_body(msg)
|
||||||
|
assert "Solo HTML" in result
|
||||||
|
assert "<html>" not in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_body_non_multipart_plain_unchanged():
|
||||||
|
msg = MIMEText("just plain text", "plain")
|
||||||
|
assert _extract_body(msg) == "just plain text"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_body_empty_message():
|
||||||
|
msg = MIMEText("", "plain")
|
||||||
|
assert _extract_body(msg) == ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_body_multipart_empty_returns_empty():
|
||||||
|
msg = MIMEMultipart("alternative")
|
||||||
|
assert _extract_body(msg) == ""
|
||||||
Loading…
Reference in a new issue