refactor: consolidate HTML extraction into app/utils.py

Rename _strip_html/_extract_body to strip_html/extract_body (public API).
Remove duplicate _TextExtractor, strip_html, and _extract_body from
imap_fetch.py; import from app.utils instead. Update test_label_tool.py
to use the new public names.
This commit is contained in:
pyr0ball 2026-04-08 06:52:15 -07:00
parent ae0ac19505
commit 25880e377d
3 changed files with 23 additions and 79 deletions

View file

@ -1,6 +1,6 @@
"""Avocet — IMAP fetch utilities. """Avocet — IMAP fetch utilities.
Shared between app/api.py (FastAPI SSE endpoint) and app/label_tool.py (Streamlit). Shared between app/api.py (FastAPI SSE endpoint) and the label UI.
No Streamlit imports here stdlib + imaplib only. No Streamlit imports here stdlib + imaplib only.
""" """
from __future__ import annotations from __future__ import annotations
@ -8,36 +8,11 @@ from __future__ import annotations
import email as _email_lib import email as _email_lib
import hashlib import hashlib
import imaplib import imaplib
import re
from datetime import datetime, timedelta from datetime import datetime, timedelta
from email.header import decode_header as _raw_decode from email.header import decode_header as _raw_decode
from html.parser import HTMLParser
from typing import Any, Iterator from typing import Any, Iterator
from app.utils import extract_body, strip_html # noqa: F401 (strip_html re-exported for callers)
# ── HTML → plain text ────────────────────────────────────────────────────────
class _TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self._parts: list[str] = []
def handle_data(self, data: str) -> None:
stripped = data.strip()
if stripped:
self._parts.append(stripped)
def get_text(self) -> str:
return " ".join(self._parts)
def strip_html(html_str: str) -> str:
try:
ex = _TextExtractor()
ex.feed(html_str)
return ex.get_text()
except Exception:
return re.sub(r"<[^>]+>", " ", html_str).strip()
# ── IMAP decode helpers ─────────────────────────────────────────────────────── # ── IMAP decode helpers ───────────────────────────────────────────────────────
@ -55,37 +30,6 @@ def _decode_str(value: str | None) -> str:
return " ".join(out).strip() return " ".join(out).strip()
def _extract_body(msg: Any) -> str:
if msg.is_multipart():
html_fallback: str | None = None
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
try:
charset = part.get_content_charset() or "utf-8"
return part.get_payload(decode=True).decode(charset, errors="replace")
except Exception:
pass
elif ct == "text/html" and html_fallback is None:
try:
charset = part.get_content_charset() or "utf-8"
raw = part.get_payload(decode=True).decode(charset, errors="replace")
html_fallback = strip_html(raw)
except Exception:
pass
return html_fallback or ""
else:
try:
charset = msg.get_content_charset() or "utf-8"
raw = msg.get_payload(decode=True).decode(charset, errors="replace")
if msg.get_content_type() == "text/html":
return strip_html(raw)
return raw
except Exception:
pass
return ""
def entry_key(e: dict) -> str: def entry_key(e: dict) -> str:
"""Stable MD5 content-hash for dedup — matches label_tool.py _entry_key.""" """Stable MD5 content-hash for dedup — matches label_tool.py _entry_key."""
key = (e.get("subject", "") + (e.get("body", "") or "")[:100]) key = (e.get("subject", "") + (e.get("body", "") or "")[:100])
@ -193,7 +137,7 @@ def fetch_account_stream(
subj = _decode_str(msg.get("Subject", "")) subj = _decode_str(msg.get("Subject", ""))
from_addr = _decode_str(msg.get("From", "")) from_addr = _decode_str(msg.get("From", ""))
date = _decode_str(msg.get("Date", "")) date = _decode_str(msg.get("Date", ""))
body = _extract_body(msg)[:800] body = extract_body(msg)[:800]
entry = {"subject": subj, "body": body, "from_addr": from_addr, entry = {"subject": subj, "body": body, "from_addr": from_addr,
"date": date, "account": name} "date": date, "account": name}
k = entry_key(entry) k = entry_key(entry)

View file

@ -43,7 +43,7 @@ class _TextExtractor(HTMLParser):
return "\n".join(ln for ln in lines if ln) return "\n".join(ln for ln in lines if ln)
def _strip_html(html_str: str) -> str: def strip_html(html_str: str) -> str:
"""Convert HTML email body to plain text. Pure stdlib, no dependencies.""" """Convert HTML email body to plain text. Pure stdlib, no dependencies."""
try: try:
extractor = _TextExtractor() extractor = _TextExtractor()
@ -53,7 +53,7 @@ def _strip_html(html_str: str) -> str:
return re.sub(r"<[^>]+>", " ", html_str).strip() return re.sub(r"<[^>]+>", " ", html_str).strip()
def _extract_body(msg: Any) -> str: def extract_body(msg: Any) -> str:
"""Return plain-text body. Strips HTML when no text/plain part exists.""" """Return plain-text body. Strips HTML when no text/plain part exists."""
if msg.is_multipart(): if msg.is_multipart():
html_fallback: str | None = None html_fallback: str | None = None
@ -69,7 +69,7 @@ def _extract_body(msg: Any) -> str:
try: try:
charset = part.get_content_charset() or "utf-8" charset = part.get_content_charset() or "utf-8"
raw = part.get_payload(decode=True).decode(charset, errors="replace") raw = part.get_payload(decode=True).decode(charset, errors="replace")
html_fallback = _strip_html(raw) html_fallback = strip_html(raw)
except Exception: except Exception:
pass pass
return html_fallback or "" return html_fallback or ""
@ -78,7 +78,7 @@ def _extract_body(msg: Any) -> str:
charset = msg.get_content_charset() or "utf-8" charset = msg.get_content_charset() or "utf-8"
raw = msg.get_payload(decode=True).decode(charset, errors="replace") raw = msg.get_payload(decode=True).decode(charset, errors="replace")
if msg.get_content_type() == "text/html": if msg.get_content_type() == "text/html":
return _strip_html(raw) return strip_html(raw)
return raw return raw
except Exception: except Exception:
pass pass

View file

@ -5,83 +5,83 @@ These functions are stdlib-only and safe to test without an IMAP connection.
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
from app.utils import _extract_body, _strip_html from app.utils import extract_body, strip_html
# ── _strip_html ────────────────────────────────────────────────────────────── # ── strip_html ──────────────────────────────────────────────────────────────
def test_strip_html_removes_tags(): def test_strip_html_removes_tags():
assert _strip_html("<p>Hello <b>world</b></p>") == "Hello world" assert strip_html("<p>Hello <b>world</b></p>") == "Hello world"
def test_strip_html_skips_script_content(): def test_strip_html_skips_script_content():
result = _strip_html("<script>doEvil()</script><p>real</p>") result = strip_html("<script>doEvil()</script><p>real</p>")
assert "doEvil" not in result assert "doEvil" not in result
assert "real" in result assert "real" in result
def test_strip_html_skips_style_content(): def test_strip_html_skips_style_content():
result = _strip_html("<style>.foo{color:red}</style><p>visible</p>") result = strip_html("<style>.foo{color:red}</style><p>visible</p>")
assert ".foo" not in result assert ".foo" not in result
assert "visible" in result assert "visible" in result
def test_strip_html_handles_br_as_newline(): def test_strip_html_handles_br_as_newline():
result = _strip_html("line1<br>line2") result = strip_html("line1<br>line2")
assert "line1" in result assert "line1" in result
assert "line2" in result assert "line2" in result
def test_strip_html_decodes_entities(): def test_strip_html_decodes_entities():
# convert_charrefs=True on HTMLParser handles &amp; etc. # convert_charrefs=True on HTMLParser handles &amp; etc.
result = _strip_html("<p>Hello &amp; welcome</p>") result = strip_html("<p>Hello &amp; welcome</p>")
assert "&amp;" not in result assert "&amp;" not in result
assert "Hello" in result assert "Hello" in result
assert "welcome" in result assert "welcome" in result
def test_strip_html_empty_string(): def test_strip_html_empty_string():
assert _strip_html("") == "" assert strip_html("") == ""
def test_strip_html_plain_text_passthrough(): def test_strip_html_plain_text_passthrough():
assert _strip_html("no tags here") == "no tags here" assert strip_html("no tags here") == "no tags here"
# ── _extract_body ──────────────────────────────────────────────────────────── # ── extract_body ────────────────────────────────────────────────────────────
def test_extract_body_prefers_plain_over_html(): def test_extract_body_prefers_plain_over_html():
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
msg.attach(MIMEText("plain body", "plain")) msg.attach(MIMEText("plain body", "plain"))
msg.attach(MIMEText("<html><body>html body</body></html>", "html")) msg.attach(MIMEText("<html><body>html body</body></html>", "html"))
assert _extract_body(msg) == "plain body" assert extract_body(msg) == "plain body"
def test_extract_body_falls_back_to_html_when_no_plain(): def test_extract_body_falls_back_to_html_when_no_plain():
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
msg.attach(MIMEText("<html><body><p>HTML only email</p></body></html>", "html")) msg.attach(MIMEText("<html><body><p>HTML only email</p></body></html>", "html"))
result = _extract_body(msg) result = extract_body(msg)
assert "HTML only email" in result assert "HTML only email" in result
assert "<" not in result # no raw HTML tags leaked through assert "<" not in result # no raw HTML tags leaked through
def test_extract_body_non_multipart_html_stripped(): def test_extract_body_non_multipart_html_stripped():
msg = MIMEText("<html><body><p>Solo HTML</p></body></html>", "html") msg = MIMEText("<html><body><p>Solo HTML</p></body></html>", "html")
result = _extract_body(msg) result = extract_body(msg)
assert "Solo HTML" in result assert "Solo HTML" in result
assert "<html>" not in result assert "<html>" not in result
def test_extract_body_non_multipart_plain_unchanged(): def test_extract_body_non_multipart_plain_unchanged():
msg = MIMEText("just plain text", "plain") msg = MIMEText("just plain text", "plain")
assert _extract_body(msg) == "just plain text" assert extract_body(msg) == "just plain text"
def test_extract_body_empty_message(): def test_extract_body_empty_message():
msg = MIMEText("", "plain") msg = MIMEText("", "plain")
assert _extract_body(msg) == "" assert extract_body(msg) == ""
def test_extract_body_multipart_empty_returns_empty(): def test_extract_body_multipart_empty_returns_empty():
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
assert _extract_body(msg) == "" assert extract_body(msg) == ""