replace parseaddr_unicode by parse_full_address
This commit is contained in:
parent
638e8137ec
commit
41478a5715
|
@ -7,12 +7,11 @@ import random
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from email.errors import HeaderParseError
|
|
||||||
from email.header import decode_header, Header
|
from email.header import decode_header, Header
|
||||||
from email.message import Message
|
from email.message import Message
|
||||||
from email.mime.multipart import MIMEMultipart
|
from email.mime.multipart import MIMEMultipart
|
||||||
from email.mime.text import MIMEText
|
from email.mime.text import MIMEText
|
||||||
from email.utils import make_msgid, formatdate, parseaddr
|
from email.utils import make_msgid, formatdate
|
||||||
from smtplib import SMTP, SMTPServerDisconnected
|
from smtplib import SMTP, SMTPServerDisconnected
|
||||||
from typing import Tuple, List, Optional, Union
|
from typing import Tuple, List, Optional, Union
|
||||||
|
|
||||||
|
@ -20,6 +19,8 @@ import arrow
|
||||||
import dkim
|
import dkim
|
||||||
import spf
|
import spf
|
||||||
from email_validator import validate_email, EmailNotValidError
|
from email_validator import validate_email, EmailNotValidError
|
||||||
|
from flanker.addresslib import address
|
||||||
|
from flanker.addresslib.address import EmailAddress
|
||||||
from jinja2 import Environment, FileSystemLoader
|
from jinja2 import Environment, FileSystemLoader
|
||||||
from sqlalchemy import func
|
from sqlalchemy import func
|
||||||
|
|
||||||
|
@ -681,47 +682,6 @@ def get_header_unicode(header: Union[str, Header]) -> str:
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def parseaddr_unicode(addr) -> (str, str):
|
|
||||||
"""Like parseaddr() but return name in unicode instead of in RFC 2047 format
|
|
||||||
Should be used instead of parseaddr()
|
|
||||||
'=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <abcd@gmail.com>' -> ('Nhơn Nguyễn', "abcd@gmail.com")
|
|
||||||
"""
|
|
||||||
# sometimes linebreaks are present in addr
|
|
||||||
addr = addr.replace("\n", "").strip()
|
|
||||||
name, email = parseaddr(addr)
|
|
||||||
# email can have whitespace so we can't remove whitespace here
|
|
||||||
email = email.strip().lower()
|
|
||||||
if name:
|
|
||||||
name = name.strip()
|
|
||||||
try:
|
|
||||||
decoded_string, charset = decode_header(name)[0]
|
|
||||||
except HeaderParseError: # fail in case
|
|
||||||
LOG.w("Can't decode name %s", name)
|
|
||||||
else:
|
|
||||||
if charset is not None:
|
|
||||||
try:
|
|
||||||
name = decoded_string.decode(charset)
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
LOG.w("Cannot decode addr name %s", name)
|
|
||||||
name = ""
|
|
||||||
except LookupError: # charset is unknown
|
|
||||||
LOG.w(
|
|
||||||
"Cannot decode %s with %s, use utf-8", decoded_string, charset
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
name = decoded_string.decode("utf-8")
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
LOG.w("utf-8 not work on %s", decoded_string)
|
|
||||||
name = ""
|
|
||||||
|
|
||||||
else:
|
|
||||||
name = decoded_string
|
|
||||||
|
|
||||||
if type(name) == bytes:
|
|
||||||
name = name.decode()
|
|
||||||
return name, email
|
|
||||||
|
|
||||||
|
|
||||||
def copy(msg: Message) -> Message:
|
def copy(msg: Message) -> Message:
|
||||||
"""return a copy of message"""
|
"""return a copy of message"""
|
||||||
try:
|
try:
|
||||||
|
@ -1257,3 +1217,17 @@ def should_ignore_bounce(mail_from: str) -> bool:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def parse_full_address(full_address) -> (str, str):
|
||||||
|
"""
|
||||||
|
parse the email address full format and return the display name and address
|
||||||
|
For ex: ab <cd@xy.com> -> (ab, cd@xy.com)
|
||||||
|
'=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <abcd@gmail.com>' -> ('Nhơn Nguyễn', "abcd@gmail.com")
|
||||||
|
|
||||||
|
If the parsing fails, raise ValueError
|
||||||
|
"""
|
||||||
|
full_address: EmailAddress = address.parse(full_address)
|
||||||
|
if full_address is None:
|
||||||
|
raise ValueError
|
||||||
|
return full_address.display_name, full_address.address
|
||||||
|
|
|
@ -10,7 +10,6 @@ from app.email_utils import (
|
||||||
email_can_be_used_as_mailbox,
|
email_can_be_used_as_mailbox,
|
||||||
delete_header,
|
delete_header,
|
||||||
add_or_replace_header,
|
add_or_replace_header,
|
||||||
parseaddr_unicode,
|
|
||||||
send_email_with_rate_control,
|
send_email_with_rate_control,
|
||||||
copy,
|
copy,
|
||||||
get_spam_from_header,
|
get_spam_from_header,
|
||||||
|
@ -30,6 +29,7 @@ from app.email_utils import (
|
||||||
get_queue_id,
|
get_queue_id,
|
||||||
should_ignore_bounce,
|
should_ignore_bounce,
|
||||||
get_header_unicode,
|
get_header_unicode,
|
||||||
|
parse_full_address,
|
||||||
)
|
)
|
||||||
from app.extensions import db
|
from app.extensions import db
|
||||||
from app.models import User, CustomDomain, Alias, Contact, EmailLog, IgnoreBounceSender
|
from app.models import User, CustomDomain, Alias, Contact, EmailLog, IgnoreBounceSender
|
||||||
|
@ -100,43 +100,37 @@ def test_add_or_replace_header():
|
||||||
assert msg._headers == [("H", "new")]
|
assert msg._headers == [("H", "new")]
|
||||||
|
|
||||||
|
|
||||||
def test_parseaddr_unicode():
|
def test_parse_full_address():
|
||||||
# only email
|
# only email
|
||||||
assert parseaddr_unicode("abcd@gmail.com") == (
|
assert parse_full_address("abcd@gmail.com") == (
|
||||||
"",
|
"",
|
||||||
"abcd@gmail.com",
|
"abcd@gmail.com",
|
||||||
)
|
)
|
||||||
|
|
||||||
# ascii address
|
# ascii address
|
||||||
assert parseaddr_unicode("First Last <abcd@gmail.com>") == (
|
assert parse_full_address("First Last <abcd@gmail.com>") == (
|
||||||
"First Last",
|
"First Last",
|
||||||
"abcd@gmail.com",
|
"abcd@gmail.com",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Handle quote
|
# Handle quote
|
||||||
assert parseaddr_unicode('"First Last" <abcd@gmail.com>') == (
|
assert parse_full_address('"First Last" <abcd@gmail.com>') == (
|
||||||
"First Last",
|
"First Last",
|
||||||
"abcd@gmail.com",
|
"abcd@gmail.com",
|
||||||
)
|
)
|
||||||
|
|
||||||
# UTF-8 charset
|
# UTF-8 charset
|
||||||
assert parseaddr_unicode("=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <abcd@gmail.com>") == (
|
assert parse_full_address("=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <abcd@gmail.com>") == (
|
||||||
"Nhơn Nguyễn",
|
"Nhơn Nguyễn",
|
||||||
"abcd@gmail.com",
|
"abcd@gmail.com",
|
||||||
)
|
)
|
||||||
|
|
||||||
# iso-8859-1 charset
|
# iso-8859-1 charset
|
||||||
assert parseaddr_unicode("=?iso-8859-1?q?p=F6stal?= <abcd@gmail.com>") == (
|
assert parse_full_address("=?iso-8859-1?q?p=F6stal?= <abcd@gmail.com>") == (
|
||||||
"pöstal",
|
"pöstal",
|
||||||
"abcd@gmail.com",
|
"abcd@gmail.com",
|
||||||
)
|
)
|
||||||
|
|
||||||
# when a name can't be decoded, return an empty string
|
|
||||||
assert parseaddr_unicode("=?UTF-8?B?Cec<65><63><EFBFBD>?= <test@example.com>") == (
|
|
||||||
"",
|
|
||||||
"test@example.com",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_send_email_with_rate_control(flask_client):
|
def test_send_email_with_rate_control(flask_client):
|
||||||
user = User.create(
|
user = User.create(
|
||||||
|
|
|
@ -3,7 +3,7 @@ from uuid import UUID
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
|
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
|
||||||
from app.email_utils import parseaddr_unicode
|
from app.email_utils import parse_full_address
|
||||||
from app.extensions import db
|
from app.extensions import db
|
||||||
from app.models import (
|
from app.models import (
|
||||||
generate_email,
|
generate_email,
|
||||||
|
@ -159,8 +159,8 @@ def test_new_addr(flask_client):
|
||||||
== "=?utf-8?q?Nh=C6=A1n_Nguy=E1=BB=85n_-_abcd_at_example=2Ecom?= <rep@SL>"
|
== "=?utf-8?q?Nh=C6=A1n_Nguy=E1=BB=85n_-_abcd_at_example=2Ecom?= <rep@SL>"
|
||||||
)
|
)
|
||||||
|
|
||||||
# sanity check for parseaddr_unicode
|
# sanity check
|
||||||
assert parseaddr_unicode(c1.new_addr()) == (
|
assert parse_full_address(c1.new_addr()) == (
|
||||||
"Nhơn Nguyễn - abcd at example.com",
|
"Nhơn Nguyễn - abcd at example.com",
|
||||||
"rep@sl",
|
"rep@sl",
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue