Create sl_formataddr to handle unicode for built-in formataddr (#1265)

* Create sl_formataddr to handle unicode for built-in formataddr

* fix circular import
This commit is contained in:
Son Nguyen Kim 2022-09-05 08:40:24 +02:00 committed by GitHub
parent 48127914c2
commit 313a928070
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 12 deletions

View file

@ -14,7 +14,7 @@ from email.header import decode_header, Header
from email.message import Message, EmailMessage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import make_msgid, formatdate
from email.utils import make_msgid, formatdate, formataddr
from smtplib import SMTP, SMTPException
from typing import Tuple, List, Optional, Union
@ -1463,3 +1463,9 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]:
if data[2] > (time.time() + VERP_MESSAGE_LIFETIME - VERP_TIME_START) / 60:
return None
return VerpType(data[0]), data[1]
def sl_formataddr(name_address_tuple: Tuple[str, str]):
"""Same as formataddr but use utf-8 encoding by default"""
name, addr = name_address_tuple
return formataddr((name, Header(addr, "utf-8")))

View file

@ -8,7 +8,6 @@ import os
import random
import secrets
import uuid
from email.utils import formataddr
from typing import List, Tuple, Optional, Union
import arrow
@ -27,8 +26,8 @@ from sqlalchemy.orm import deferred
from sqlalchemy.sql import and_
from sqlalchemy_utils import ArrowType
from app import s3
from app import config
from app import s3
from app.db import Session
from app.errors import (
AliasInTrashError,
@ -1807,7 +1806,9 @@ class Contact(Base, ModelMixin):
else formatted_email
)
new_addr = formataddr((new_name, self.reply_email)).strip()
from app.email_utils import sl_formataddr
new_addr = sl_formataddr((new_name, self.reply_email)).strip()
return new_addr.strip()
def last_reply(self) -> "EmailLog":

View file

@ -39,7 +39,7 @@ from email.encoders import encode_noop
from email.message import Message
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.utils import formataddr, make_msgid, formatdate, getaddresses
from email.utils import make_msgid, formatdate, getaddresses
from io import BytesIO
from smtplib import SMTPRecipientsRefused, SMTPServerDisconnected
from typing import List, Tuple, Optional
@ -122,6 +122,7 @@ from app.email_utils import (
save_envelope_for_debugging,
get_verp_info_from_email,
generate_verp_email,
sl_formataddr,
)
from app.errors import (
NonReverseAliasInReplyPhase,
@ -134,14 +135,14 @@ from app.handler.dmarc import (
apply_dmarc_policy_for_reply_phase,
apply_dmarc_policy_for_forward_phase,
)
from app.handler.spamd_result import (
SpamdResult,
SPFCheckResult,
)
from app.handler.provider_complaint import (
handle_hotmail_complaint,
handle_yahoo_complaint,
)
from app.handler.spamd_result import (
SpamdResult,
SPFCheckResult,
)
from app.handler.unsubscribe_generator import UnsubscribeGenerator
from app.handler.unsubscribe_handler import UnsubscribeHandler
from app.log import LOG, set_message_id
@ -445,7 +446,7 @@ def replace_header_when_reply(msg: Message, alias: Alias, header: str):
# still keep this email in header
# new_addrs.append(reply_email)
else:
new_addrs.append(formataddr((contact.name, contact.website_email)))
new_addrs.append(sl_formataddr((contact.name, contact.website_email)))
if new_addrs:
new_header = ",".join(new_addrs)
@ -1156,7 +1157,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
# add alias name from alias
if alias.name:
LOG.d("Put alias name %s in from header", alias.name)
from_header = formataddr((alias.name, alias.email))
from_header = sl_formataddr((alias.name, alias.email))
elif alias.custom_domain:
# add alias name from domain
if alias.custom_domain.name:
@ -1164,7 +1165,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
"Put domain default alias name %s in from header",
alias.custom_domain.name,
)
from_header = formataddr((alias.custom_domain.name, alias.email))
from_header = sl_formataddr((alias.custom_domain.name, alias.email))
LOG.d("From header is %s", from_header)
add_or_replace_header(msg, headers.FROM, from_header)

View file

@ -1,6 +1,7 @@
import email
import os
from email.message import EmailMessage
from email.utils import formataddr
import arrow
import pytest
@ -37,6 +38,7 @@ from app.email_utils import (
is_invalid_mailbox_domain,
generate_verp_email,
get_verp_info_from_email,
sl_formataddr,
)
from app.models import (
CustomDomain,
@ -777,3 +779,10 @@ def test_add_header_multipart_with_invalid_part():
assert part.get_payload().index("INJECT") > -1
else:
assert part == "invalid"
def test_sl_formataddr():
assert sl_formataddr(("é", "è@ç.à")) == "=?utf-8?b?w6k=?= <è@ç.à>"
# test that the same name-address can't be handled by the built-in formataddr
with pytest.raises(UnicodeEncodeError):
formataddr(("é", "è@ç.à"))