Rename _strip_html/_extract_body to strip_html/extract_body (public API). Remove duplicate _TextExtractor, strip_html, and _extract_body from imap_fetch.py; import from app.utils instead. Update test_label_tool.py to use the new public names.
85 lines
3 KiB
Python
85 lines
3 KiB
Python
"""Shared email utility functions for Avocet.
|
|
|
|
Pure-stdlib helpers extracted from the retired label_tool.py Streamlit app.
|
|
These are reused by the FastAPI backend and the test suite.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from html.parser import HTMLParser
|
|
from typing import Any
|
|
|
|
|
|
# ── HTML → plain-text extractor ──────────────────────────────────────────────
|
|
|
|
class _TextExtractor(HTMLParser):
|
|
"""Extract visible text from an HTML email body, preserving line breaks."""
|
|
_BLOCK = {"p", "div", "br", "li", "tr", "h1", "h2", "h3", "h4", "h5", "h6", "blockquote"}
|
|
_SKIP = {"script", "style", "head", "noscript"}
|
|
|
|
def __init__(self):
|
|
super().__init__(convert_charrefs=True)
|
|
self._parts: list[str] = []
|
|
self._depth_skip = 0
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
tag = tag.lower()
|
|
if tag in self._SKIP:
|
|
self._depth_skip += 1
|
|
elif tag in self._BLOCK:
|
|
self._parts.append("\n")
|
|
|
|
def handle_endtag(self, tag):
|
|
if tag.lower() in self._SKIP:
|
|
self._depth_skip = max(0, self._depth_skip - 1)
|
|
|
|
def handle_data(self, data):
|
|
if not self._depth_skip:
|
|
self._parts.append(data)
|
|
|
|
def get_text(self) -> str:
|
|
text = "".join(self._parts)
|
|
lines = [ln.strip() for ln in text.splitlines()]
|
|
return "\n".join(ln for ln in lines if ln)
|
|
|
|
|
|
def strip_html(html_str: str) -> str:
|
|
"""Convert HTML email body to plain text. Pure stdlib, no dependencies."""
|
|
try:
|
|
extractor = _TextExtractor()
|
|
extractor.feed(html_str)
|
|
return extractor.get_text()
|
|
except Exception:
|
|
return re.sub(r"<[^>]+>", " ", html_str).strip()
|
|
|
|
|
|
def extract_body(msg: Any) -> str:
|
|
"""Return plain-text body. Strips HTML when no text/plain part exists."""
|
|
if msg.is_multipart():
|
|
html_fallback: str | None = None
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/plain":
|
|
try:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return part.get_payload(decode=True).decode(charset, errors="replace")
|
|
except Exception:
|
|
pass
|
|
elif ct == "text/html" and html_fallback is None:
|
|
try:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
raw = part.get_payload(decode=True).decode(charset, errors="replace")
|
|
html_fallback = strip_html(raw)
|
|
except Exception:
|
|
pass
|
|
return html_fallback or ""
|
|
else:
|
|
try:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
raw = msg.get_payload(decode=True).decode(charset, errors="replace")
|
|
if msg.get_content_type() == "text/html":
|
|
return strip_html(raw)
|
|
return raw
|
|
except Exception:
|
|
pass
|
|
return ""
|