Alias domain as contact domain (#1689)

* Use the alias domain for contacts

* Check there are not duplicate emails

* Check also in trash

* Use helper

* Set VERP for the forward phase to the contact domain

* Add pgp_fingerprint as index for contacts

* Removed check trash

* Only use reply domains for sl domains

* Configure via db wether the domain can be used as a reverse_domain

* Fix: typo

* reverse logic

* fix migration

* fix test

---------

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
Co-authored-by: Son <nguyenkims@users.noreply.github.com>
This commit is contained in:
Adrià Casajús 2023-04-20 12:14:53 +02:00 committed by GitHub
parent 7f23533c64
commit bec8cb2292
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 150 additions and 38 deletions

View File

@ -90,7 +90,7 @@ def create_contact(user: User, alias: Alias, contact_address: str) -> Contact:
alias_id=alias.id, alias_id=alias.id,
website_email=contact_email, website_email=contact_email,
name=contact_name, name=contact_name,
reply_email=generate_reply_email(contact_email, user), reply_email=generate_reply_email(contact_email, alias),
) )
LOG.d( LOG.d(

View File

@ -54,6 +54,7 @@ from app.models import (
IgnoreBounceSender, IgnoreBounceSender,
InvalidMailboxDomain, InvalidMailboxDomain,
VerpType, VerpType,
available_sl_email,
) )
from app.utils import ( from app.utils import (
random_string, random_string,
@ -1043,7 +1044,7 @@ def replace(msg: Union[Message, str], old, new) -> Union[Message, str]:
return msg return msg
def generate_reply_email(contact_email: str, user: User) -> str: def generate_reply_email(contact_email: str, alias: Alias) -> str:
""" """
generate a reply_email (aka reverse-alias), make sure it isn't used by any contact generate a reply_email (aka reverse-alias), make sure it isn't used by any contact
""" """
@ -1054,6 +1055,7 @@ def generate_reply_email(contact_email: str, user: User) -> str:
include_sender_in_reverse_alias = False include_sender_in_reverse_alias = False
user = alias.user
# user has set this option explicitly # user has set this option explicitly
if user.include_sender_in_reverse_alias is not None: if user.include_sender_in_reverse_alias is not None:
include_sender_in_reverse_alias = user.include_sender_in_reverse_alias include_sender_in_reverse_alias = user.include_sender_in_reverse_alias
@ -1068,6 +1070,12 @@ def generate_reply_email(contact_email: str, user: User) -> str:
contact_email = contact_email.replace(".", "_") contact_email = contact_email.replace(".", "_")
contact_email = convert_to_alphanumeric(contact_email) contact_email = convert_to_alphanumeric(contact_email)
reply_domain = config.EMAIL_DOMAIN
alias_domain = get_email_domain_part(alias.email)
sl_domain = SLDomain.get_by(domain=alias_domain)
if sl_domain and sl_domain.use_as_reverse_alias:
reply_domain = alias_domain
# not use while to avoid infinite loop # not use while to avoid infinite loop
for _ in range(1000): for _ in range(1000):
if include_sender_in_reverse_alias and contact_email: if include_sender_in_reverse_alias and contact_email:
@ -1075,15 +1083,15 @@ def generate_reply_email(contact_email: str, user: User) -> str:
reply_email = ( reply_email = (
# do not use the ra+ anymore # do not use the ra+ anymore
# f"ra+{contact_email}+{random_string(random_length)}@{config.EMAIL_DOMAIN}" # f"ra+{contact_email}+{random_string(random_length)}@{config.EMAIL_DOMAIN}"
f"{contact_email}_{random_string(random_length)}@{config.EMAIL_DOMAIN}" f"{contact_email}_{random_string(random_length)}@{reply_domain}"
) )
else: else:
random_length = random.randint(20, 50) random_length = random.randint(20, 50)
# do not use the ra+ anymore # do not use the ra+ anymore
# reply_email = f"ra+{random_string(random_length)}@{config.EMAIL_DOMAIN}" # reply_email = f"ra+{random_string(random_length)}@{config.EMAIL_DOMAIN}"
reply_email = f"{random_string(random_length)}@{config.EMAIL_DOMAIN}" reply_email = f"{random_string(random_length)}@{reply_domain}"
if not Contact.get_by(reply_email=reply_email): if available_sl_email(reply_email):
return reply_email return reply_email
raise Exception("Cannot generate reply email") raise Exception("Cannot generate reply email")

View File

@ -1298,16 +1298,30 @@ class OauthToken(Base, ModelMixin):
return self.expired < arrow.now() return self.expired < arrow.now()
def generate_email( def available_sl_email(email: str) -> bool:
if (
Alias.get_by(email=email)
or Contact.get_by(reply_email=email)
or DeletedAlias.get_by(email=email)
):
return False
return True
def generate_random_alias_email(
scheme: int = AliasGeneratorEnum.word.value, scheme: int = AliasGeneratorEnum.word.value,
in_hex: bool = False, in_hex: bool = False,
alias_domain=config.FIRST_ALIAS_DOMAIN, alias_domain: str = config.FIRST_ALIAS_DOMAIN,
retries: int = 10,
) -> str: ) -> str:
"""generate an email address that does not exist before """generate an email address that does not exist before
:param alias_domain: the domain used to generate the alias. :param alias_domain: the domain used to generate the alias.
:param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated :param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated
:param retries: int, How many times we can try to generate an alias in case of collision
:type in_hex: bool, if the generate scheme is uuid, is hex favorable? :type in_hex: bool, if the generate scheme is uuid, is hex favorable?
""" """
if retries <= 0:
raise Exception("Cannot generate alias after many retries")
if scheme == AliasGeneratorEnum.uuid.value: if scheme == AliasGeneratorEnum.uuid.value:
name = uuid.uuid4().hex if in_hex else uuid.uuid4().__str__() name = uuid.uuid4().hex if in_hex else uuid.uuid4().__str__()
random_email = name + "@" + alias_domain random_email = name + "@" + alias_domain
@ -1317,15 +1331,15 @@ def generate_email(
random_email = random_email.lower().strip() random_email = random_email.lower().strip()
# check that the client does not exist yet # check that the client does not exist yet
if not Alias.get_by(email=random_email) and not DeletedAlias.get_by( if available_sl_email(random_email):
email=random_email
):
LOG.d("generate email %s", random_email) LOG.d("generate email %s", random_email)
return random_email return random_email
# Rerun the function # Rerun the function
LOG.w("email %s already exists, generate a new email", random_email) LOG.w("email %s already exists, generate a new email", random_email)
return generate_email(scheme=scheme, in_hex=in_hex) return generate_random_alias_email(
scheme=scheme, in_hex=in_hex, retries=retries - 1
)
class Alias(Base, ModelMixin): class Alias(Base, ModelMixin):
@ -1524,7 +1538,7 @@ class Alias(Base, ModelMixin):
suffix = user.get_random_alias_suffix() suffix = user.get_random_alias_suffix()
email = f"{prefix}.{suffix}@{config.FIRST_ALIAS_DOMAIN}" email = f"{prefix}.{suffix}@{config.FIRST_ALIAS_DOMAIN}"
if not cls.get_by(email=email) and not DeletedAlias.get_by(email=email): if available_sl_email(email):
break break
return Alias.create( return Alias.create(
@ -1553,7 +1567,7 @@ class Alias(Base, ModelMixin):
if user.default_alias_custom_domain_id: if user.default_alias_custom_domain_id:
custom_domain = CustomDomain.get(user.default_alias_custom_domain_id) custom_domain = CustomDomain.get(user.default_alias_custom_domain_id)
random_email = generate_email( random_email = generate_random_alias_email(
scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain
) )
elif user.default_alias_public_domain_id: elif user.default_alias_public_domain_id:
@ -1561,12 +1575,12 @@ class Alias(Base, ModelMixin):
if sl_domain.premium_only and not user.is_premium(): if sl_domain.premium_only and not user.is_premium():
LOG.w("%s not premium, cannot use %s", user, sl_domain) LOG.w("%s not premium, cannot use %s", user, sl_domain)
else: else:
random_email = generate_email( random_email = generate_random_alias_email(
scheme=scheme, in_hex=in_hex, alias_domain=sl_domain.domain scheme=scheme, in_hex=in_hex, alias_domain=sl_domain.domain
) )
if not random_email: if not random_email:
random_email = generate_email(scheme=scheme, in_hex=in_hex) random_email = generate_random_alias_email(scheme=scheme, in_hex=in_hex)
alias = Alias.create( alias = Alias.create(
user_id=user.id, user_id=user.id,
@ -2866,6 +2880,10 @@ class SLDomain(Base, ModelMixin):
# the order in which the domains are shown when user creates a custom alias # the order in which the domains are shown when user creates a custom alias
order = sa.Column(sa.Integer, nullable=False, default=0, server_default="0") order = sa.Column(sa.Integer, nullable=False, default=0, server_default="0")
use_as_reverse_alias = sa.Column(
sa.Boolean, nullable=False, default=False, server_default="0"
)
def __repr__(self): def __repr__(self):
return f"<SLDomain {self.domain} {'Premium' if self.premium_only else 'Free'}" return f"<SLDomain {self.domain} {'Premium' if self.premium_only else 'Free'}"

View File

@ -161,6 +161,7 @@ from app.models import (
MessageIDMatching, MessageIDMatching,
Notification, Notification,
VerpType, VerpType,
SLDomain,
) )
from app.pgp_utils import ( from app.pgp_utils import (
PGPException, PGPException,
@ -243,7 +244,7 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
website_email=contact_email, website_email=contact_email,
name=contact_name, name=contact_name,
mail_from=mail_from, mail_from=mail_from,
reply_email=generate_reply_email(contact_email, alias.user) reply_email=generate_reply_email(contact_email, alias)
if is_valid_email(contact_email) if is_valid_email(contact_email)
else NOREPLY, else NOREPLY,
automatic_created=True, automatic_created=True,
@ -304,7 +305,7 @@ def get_or_create_reply_to_contact(
alias_id=alias.id, alias_id=alias.id,
website_email=contact_address, website_email=contact_address,
name=contact_name, name=contact_name,
reply_email=generate_reply_email(contact_address, alias.user), reply_email=generate_reply_email(contact_address, alias),
automatic_created=True, automatic_created=True,
) )
Session.commit() Session.commit()
@ -372,7 +373,7 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str):
alias_id=alias.id, alias_id=alias.id,
website_email=contact_email, website_email=contact_email,
name=full_address.display_name, name=full_address.display_name,
reply_email=generate_reply_email(contact_email, alias.user), reply_email=generate_reply_email(contact_email, alias),
is_cc=header.lower() == "cc", is_cc=header.lower() == "cc",
automatic_created=True, automatic_created=True,
) )
@ -945,10 +946,11 @@ def forward_email_to_mailbox(
envelope.rcpt_options, envelope.rcpt_options,
) )
contact_domain = get_email_domain_part(contact.reply_email)
try: try:
sl_sendmail( sl_sendmail(
# use a different envelope sender for each forward (aka VERP) # use a different envelope sender for each forward (aka VERP)
generate_verp_email(VerpType.bounce_forward, email_log.id), generate_verp_email(VerpType.bounce_forward, email_log.id, contact_domain),
mailbox.email, mailbox.email,
msg, msg,
envelope.mail_options, envelope.mail_options,
@ -1017,8 +1019,11 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
reply_email = rcpt_to reply_email = rcpt_to
reply_domain = get_email_domain_part(reply_email)
# reply_email must end with EMAIL_DOMAIN # reply_email must end with EMAIL_DOMAIN
if not reply_email.endswith(EMAIL_DOMAIN): if not reply_email.endswith(EMAIL_DOMAIN) or not SLDomain.get_by(
domain=reply_domain
):
LOG.w(f"Reply email {reply_email} has wrong domain") LOG.w(f"Reply email {reply_email} has wrong domain")
return False, status.E501 return False, status.E501
@ -1032,7 +1037,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
alias = contact.alias alias = contact.alias
alias_address: str = contact.alias.email alias_address: str = contact.alias.email
alias_domain = alias_address[alias_address.find("@") + 1 :] alias_domain = get_email_domain_part(alias_address)
# Sanity check: verify alias domain is managed by SimpleLogin # Sanity check: verify alias domain is managed by SimpleLogin
# scenario: a user have removed a domain but due to a bug, the aliases are still there # scenario: a user have removed a domain but due to a bug, the aliases are still there

View File

@ -42,14 +42,16 @@ def add_sl_domains():
LOG.d("%s is already a SL domain", alias_domain) LOG.d("%s is already a SL domain", alias_domain)
else: else:
LOG.i("Add %s to SL domain", alias_domain) LOG.i("Add %s to SL domain", alias_domain)
SLDomain.create(domain=alias_domain) SLDomain.create(domain=alias_domain, use_as_reverse_alias=True)
for premium_domain in PREMIUM_ALIAS_DOMAINS: for premium_domain in PREMIUM_ALIAS_DOMAINS:
if SLDomain.get_by(domain=premium_domain): if SLDomain.get_by(domain=premium_domain):
LOG.d("%s is already a SL domain", premium_domain) LOG.d("%s is already a SL domain", premium_domain)
else: else:
LOG.i("Add %s to SL domain", premium_domain) LOG.i("Add %s to SL domain", premium_domain)
SLDomain.create(domain=premium_domain, premium_only=True) SLDomain.create(
domain=premium_domain, premium_only=True, use_as_reverse_alias=True
)
Session.commit() Session.commit()

View File

@ -0,0 +1,29 @@
"""empty message
Revision ID: 01e2997e90d3
Revises: 893c0d18475f
Create Date: 2023-04-19 16:09:11.851588
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '01e2997e90d3'
down_revision = '893c0d18475f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('public_domain', sa.Column('use_as_reverse_alias', sa.Boolean(), server_default='0', nullable=False))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('public_domain', 'use_as_reverse_alias')
# ### end Alembic commands ###

View File

@ -0,0 +1,25 @@
"""empty message
Revision ID: 2634b41f54db
Revises: 01e2997e90d3, 2d89315ac650
Create Date: 2023-04-20 11:47:43.048536
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2634b41f54db'
down_revision = ('01e2997e90d3', '2d89315ac650')
branch_labels = None
depends_on = None
def upgrade():
pass
def downgrade():
pass

View File

@ -6,6 +6,7 @@ from email.utils import formataddr
import arrow import arrow
import pytest import pytest
from app import config
from app.config import MAX_ALERT_24H, EMAIL_DOMAIN, ROOT_DIR from app.config import MAX_ALERT_24H, EMAIL_DOMAIN, ROOT_DIR
from app.db import Session from app.db import Session
from app.email_utils import ( from app.email_utils import (
@ -48,6 +49,8 @@ from app.models import (
IgnoreBounceSender, IgnoreBounceSender,
InvalidMailboxDomain, InvalidMailboxDomain,
VerpType, VerpType,
AliasGeneratorEnum,
SLDomain,
) )
# flake8: noqa: E101, W191 # flake8: noqa: E101, W191
@ -469,33 +472,55 @@ def test_replace_str():
def test_generate_reply_email(flask_client): def test_generate_reply_email(flask_client):
user = create_new_user() user = create_new_user()
reply_email = generate_reply_email("test@example.org", user) alias = Alias.create_new_random(user, AliasGeneratorEnum.uuid.value)
assert reply_email.endswith(EMAIL_DOMAIN) Session.commit()
reply_email = generate_reply_email("test@example.org", alias)
domain = get_email_domain_part(alias.email)
assert reply_email.endswith(domain)
reply_email = generate_reply_email("", user) reply_email = generate_reply_email("", alias)
assert reply_email.endswith(EMAIL_DOMAIN) domain = get_email_domain_part(alias.email)
assert reply_email.endswith(domain)
def test_generate_reply_email_with_default_reply_domain(flask_client):
domain = SLDomain.create(domain=random_domain(), use_as_reverse_alias=False)
user = create_new_user()
alias = Alias.create(
user_id=user.id,
email=f"test@{domain.domain}",
mailbox_id=user.default_mailbox_id,
)
Session.commit()
reply_email = generate_reply_email("test@example.org", alias)
domain = get_email_domain_part(reply_email)
assert domain == config.EMAIL_DOMAIN
def test_generate_reply_email_include_sender_in_reverse_alias(flask_client): def test_generate_reply_email_include_sender_in_reverse_alias(flask_client):
# user enables include_sender_in_reverse_alias # user enables include_sender_in_reverse_alias
user = create_new_user() user = create_new_user()
alias = Alias.create_new_random(user, AliasGeneratorEnum.uuid.value)
Session.commit()
user.include_sender_in_reverse_alias = True user.include_sender_in_reverse_alias = True
reply_email = generate_reply_email("test@example.org", user) reply_email = generate_reply_email("test@example.org", alias)
assert reply_email.startswith("test_at_example_org") assert reply_email.startswith("test_at_example_org")
assert reply_email.endswith(EMAIL_DOMAIN) domain = get_email_domain_part(alias.email)
assert reply_email.endswith(domain)
reply_email = generate_reply_email("", user) reply_email = generate_reply_email("", alias)
assert reply_email.endswith(EMAIL_DOMAIN) domain = get_email_domain_part(alias.email)
assert reply_email.endswith(domain)
reply_email = generate_reply_email("👌汉字@example.org", user) reply_email = generate_reply_email("👌汉字@example.org", alias)
assert reply_email.startswith("yizi_at_example_org") assert reply_email.startswith("yizi_at_example_org")
# make sure reply_email only contain lowercase # make sure reply_email only contain lowercase
reply_email = generate_reply_email("TEST@example.org", user) reply_email = generate_reply_email("TEST@example.org", alias)
assert reply_email.startswith("test_at_example_org") assert reply_email.startswith("test_at_example_org")
reply_email = generate_reply_email("test.dot@example.org", user) reply_email = generate_reply_email("test.dot@example.org", alias)
assert reply_email.startswith("test_dot_at_example_org") assert reply_email.startswith("test_dot_at_example_org")

View File

@ -8,7 +8,7 @@ from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN, NOREPLY
from app.db import Session from app.db import Session
from app.email_utils import parse_full_address, generate_reply_email from app.email_utils import parse_full_address, generate_reply_email
from app.models import ( from app.models import (
generate_email, generate_random_alias_email,
Alias, Alias,
Contact, Contact,
Mailbox, Mailbox,
@ -22,13 +22,13 @@ from tests.utils import login, create_new_user, random_token
def test_generate_email(flask_client): def test_generate_email(flask_client):
email = generate_email() email = generate_random_alias_email()
assert email.endswith("@" + EMAIL_DOMAIN) assert email.endswith("@" + EMAIL_DOMAIN)
with pytest.raises(ValueError): with pytest.raises(ValueError):
UUID(email.split("@")[0], version=4) UUID(email.split("@")[0], version=4)
email_uuid = generate_email(scheme=2) email_uuid = generate_random_alias_email(scheme=2)
assert UUID(email_uuid.split("@")[0], version=4) assert UUID(email_uuid.split("@")[0], version=4)
@ -312,6 +312,6 @@ def test_create_contact_for_noreply(flask_client):
user_id=user.id, user_id=user.id,
alias_id=alias.id, alias_id=alias.id,
website_email=NOREPLY, website_email=NOREPLY,
reply_email=generate_reply_email(NOREPLY, user), reply_email=generate_reply_email(NOREPLY, alias),
) )
assert contact.website_email == NOREPLY assert contact.website_email == NOREPLY