fix: prefer HTML body in imap_sync, strip head/style/script, remove 4000-char truncation
Some checks failed
CI / test (pull_request) Failing after 25s

- _parse_message now prefers text/html over text/plain so digest emails
  retain href attribute values needed for link extraction
- Strip <head>, <style>, <script> blocks before storing to remove CSS/JS
  garbage while keeping anchor tags intact
- Remove [:4000] truncation — digest emails need full body for URL regex
- Update test: large body should NOT be truncated (assert len == 10_000)
This commit is contained in:
pyr0ball 2026-03-20 13:35:30 -07:00
parent 8c3c0340ff
commit b9ef1f631e
2 changed files with 31 additions and 9 deletions

View file

@ -698,21 +698,43 @@ def _parse_message(conn: imaplib.IMAP4, uid: bytes) -> Optional[dict]:
return None return None
msg = email.message_from_bytes(data[0][1]) msg = email.message_from_bytes(data[0][1])
body = "" # Prefer text/html (preserves href attributes for digest link extraction);
# fall back to text/plain if no HTML part exists.
html_body = ""
plain_body = ""
if msg.is_multipart(): if msg.is_multipart():
for part in msg.walk(): for part in msg.walk():
if part.get_content_type() == "text/plain": ct = part.get_content_type()
if ct == "text/html" and not html_body:
try: try:
body = part.get_payload(decode=True).decode("utf-8", errors="replace") html_body = part.get_payload(decode=True).decode("utf-8", errors="replace")
except Exception:
pass
elif ct == "text/plain" and not plain_body:
try:
plain_body = part.get_payload(decode=True).decode("utf-8", errors="replace")
except Exception: except Exception:
pass pass
break
else: else:
ct = msg.get_content_type()
try: try:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace") raw = msg.get_payload(decode=True).decode("utf-8", errors="replace")
if ct == "text/html":
html_body = raw
else:
plain_body = raw
except Exception: except Exception:
pass pass
if html_body:
# Strip <head>…</head> (CSS, meta, title) and any stray <style> blocks.
# Keeps <body> HTML intact so href attributes survive for digest extraction.
body = re.sub(r"<head[\s\S]*?</head>", "", html_body, flags=re.I)
body = re.sub(r"<style[\s\S]*?</style>", "", body, flags=re.I)
body = re.sub(r"<script[\s\S]*?</script>", "", body, flags=re.I)
else:
body = plain_body
mid = msg.get("Message-ID", "").strip() mid = msg.get("Message-ID", "").strip()
if not mid: if not mid:
return None # No Message-ID → can't dedup; skip to avoid repeat inserts return None # No Message-ID → can't dedup; skip to avoid repeat inserts
@ -723,7 +745,7 @@ def _parse_message(conn: imaplib.IMAP4, uid: bytes) -> Optional[dict]:
"from_addr": _decode_str(msg.get("From")), "from_addr": _decode_str(msg.get("From")),
"to_addr": _decode_str(msg.get("To")), "to_addr": _decode_str(msg.get("To")),
"date": _decode_str(msg.get("Date")), "date": _decode_str(msg.get("Date")),
"body": body[:4000], "body": body, # no truncation — digest emails need full content
} }
except Exception: except Exception:
return None return None

View file

@ -1024,8 +1024,8 @@ def test_sync_all_per_job_exception_continues(tmp_path):
# ── Performance / edge cases ────────────────────────────────────────────────── # ── Performance / edge cases ──────────────────────────────────────────────────
def test_parse_message_large_body_truncated(): def test_parse_message_large_body_not_truncated():
"""Body longer than 4000 chars is silently truncated to 4000.""" """Body longer than 4000 chars is stored in full (no truncation)."""
from scripts.imap_sync import _parse_message from scripts.imap_sync import _parse_message
big_body = ("x" * 10_000).encode() big_body = ("x" * 10_000).encode()
@ -1037,7 +1037,7 @@ def test_parse_message_large_body_truncated():
conn.fetch.return_value = ("OK", [(b"1 (RFC822)", raw)]) conn.fetch.return_value = ("OK", [(b"1 (RFC822)", raw)])
result = _parse_message(conn, b"1") result = _parse_message(conn, b"1")
assert result is not None assert result is not None
assert len(result["body"]) <= 4000 assert len(result["body"]) == 10_000
def test_parse_message_binary_attachment_no_crash(): def test_parse_message_binary_attachment_no_crash():