avocet/app/utils.py

117 lines
4 KiB
Python

"""Shared email utility functions for Avocet.
Pure-stdlib helpers extracted from the retired label_tool.py Streamlit app.
These are reused by the FastAPI backend and the test suite.
"""
from __future__ import annotations
import json
import re
from html.parser import HTMLParser
from pathlib import Path
from typing import Any
# ── HTML → plain-text extractor ──────────────────────────────────────────────
class _TextExtractor(HTMLParser):
"""Extract visible text from an HTML email body, preserving line breaks."""
_BLOCK = {"p", "div", "br", "li", "tr", "h1", "h2", "h3", "h4", "h5", "h6", "blockquote"}
_SKIP = {"script", "style", "head", "noscript"}
def __init__(self):
super().__init__(convert_charrefs=True)
self._parts: list[str] = []
self._depth_skip = 0
def handle_starttag(self, tag, attrs):
tag = tag.lower()
if tag in self._SKIP:
self._depth_skip += 1
elif tag in self._BLOCK:
self._parts.append("\n")
def handle_endtag(self, tag):
if tag.lower() in self._SKIP:
self._depth_skip = max(0, self._depth_skip - 1)
def handle_data(self, data):
if not self._depth_skip:
self._parts.append(data)
def get_text(self) -> str:
text = "".join(self._parts)
lines = [ln.strip() for ln in text.splitlines()]
return "\n".join(ln for ln in lines if ln)
def strip_html(html_str: str) -> str:
"""Convert HTML email body to plain text. Pure stdlib, no dependencies."""
try:
extractor = _TextExtractor()
extractor.feed(html_str)
return extractor.get_text()
except Exception:
return re.sub(r"<[^>]+>", " ", html_str).strip()
def extract_body(msg: Any) -> str:
"""Return plain-text body. Strips HTML when no text/plain part exists."""
if msg.is_multipart():
html_fallback: str | None = None
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
try:
charset = part.get_content_charset() or "utf-8"
return part.get_payload(decode=True).decode(charset, errors="replace")
except Exception:
pass
elif ct == "text/html" and html_fallback is None:
try:
charset = part.get_content_charset() or "utf-8"
raw = part.get_payload(decode=True).decode(charset, errors="replace")
html_fallback = strip_html(raw)
except Exception:
pass
return html_fallback or ""
else:
try:
charset = msg.get_content_charset() or "utf-8"
raw = msg.get_payload(decode=True).decode(charset, errors="replace")
if msg.get_content_type() == "text/html":
return strip_html(raw)
return raw
except Exception:
pass
return ""
def read_jsonl(path: Path) -> list[dict]:
"""Read a JSONL file, returning valid records. Skips blank lines and malformed JSON."""
if not path.exists():
return []
records: list[dict] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except json.JSONDecodeError:
pass
return records
def write_jsonl(path: Path, records: list[dict]) -> None:
"""Write records to a JSONL file, overwriting any existing content."""
path.parent.mkdir(parents=True, exist_ok=True)
content = "\n".join(json.dumps(r) for r in records)
path.write_text(content + ("\n" if records else ""), encoding="utf-8")
def append_jsonl(path: Path, record: dict) -> None:
"""Append a single record to a JSONL file."""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(record) + "\n")