Compare commits

...

10 commits

Author SHA1 Message Date
Adrià Casajús a5e0623289
Merge fixes 2022-04-28 12:48:45 +02:00
Adrià Casajús c522652367
Merge remote-tracking branch 'origin/master' into ac-check-create-alias
* origin/master:
  not send emails to inform about an alias can't be created to disabled user
  prevent disabled user from using the api
  make sure disabled user can't create new alias
  move help to menu on small screen
  only show the help button on desktop
  use another logo for mobile
2022-04-28 12:44:30 +02:00
Adrià Casajús 5170d3371c
Pass notify user flag 2022-04-28 09:40:35 +02:00
Adrià Casajús 3655c60c3c
Return user instead of boolean 2022-04-28 07:49:53 +02:00
Adrià Casajús ff734aaa06
Allow sending emails in a background thread 2022-04-27 18:45:19 +02:00
Adrià Casajús 90fa4acad9
Restore sending notification to user when no alias can be created 2022-04-27 16:06:27 +02:00
Adrià Casajús 2c26ced375
Assert properly 2022-04-27 15:34:30 +02:00
Adrià Casajús 5157decd58
Removed unused import 2022-04-27 15:26:32 +02:00
Adrià Casajús 35e11bced8
format 2022-04-27 15:21:56 +02:00
Adrià Casajús ea99a60ca6
Add methods to check if an alias will be auto-generated 2022-04-27 15:18:34 +02:00
10 changed files with 487 additions and 238 deletions

View file

@ -1,18 +1,23 @@
import re
from typing import Optional
from typing import Optional, Tuple
from email_validator import validate_email, EmailNotValidError
from sqlalchemy.exc import IntegrityError, DataError
from app.config import BOUNCE_PREFIX_FOR_REPLY_PHASE, BOUNCE_PREFIX, BOUNCE_SUFFIX
from app.config import (
BOUNCE_PREFIX_FOR_REPLY_PHASE,
BOUNCE_PREFIX,
BOUNCE_SUFFIX,
VERP_PREFIX,
)
from app.db import Session
from app.email_utils import (
get_email_domain_part,
send_cannot_create_directory_alias,
send_cannot_create_domain_alias,
can_create_directory_for_address,
send_cannot_create_directory_alias_disabled,
get_email_local_part,
send_cannot_create_domain_alias,
)
from app.errors import AliasInTrashError
from app.log import LOG
@ -27,10 +32,131 @@ from app.models import (
Mailbox,
EmailLog,
Contact,
AutoCreateRule,
)
from app.regex_utils import regex_match
def get_user_if_alias_would_auto_create(
address: str, notify_user: bool = False
) -> Optional[User]:
banned_prefix = f"{VERP_PREFIX}."
if address.startswith(banned_prefix):
LOG.w("alias %s can't start with %s", address, banned_prefix)
return None
try:
# Prevent addresses with unicode characters (🤯) in them for now.
validate_email(address, check_deliverability=False, allow_smtputf8=False)
except EmailNotValidError:
return None
will_create = check_if_alias_can_be_auto_created_for_custom_domain(
address, notify_user=notify_user
)
if will_create:
return will_create[0].user
directory = check_if_alias_can_be_auto_created_for_a_directory(
address, notify_user=notify_user
)
if directory:
return directory.user
return None
def check_if_alias_can_be_auto_created_for_custom_domain(
address: str, notify_user: bool = True
) -> Optional[Tuple[CustomDomain, Optional[AutoCreateRule]]]:
"""
Check if this address would generate an auto created alias.
If that's the case return the domain that would create it and the rule that triggered it.
If there's no rule it's a catchall creation
"""
alias_domain = get_email_domain_part(address)
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
if not custom_domain:
return None
user: User = custom_domain.user
if user.disabled:
LOG.i("Disabled user %s can't create new alias via custom domain", user)
return None
if not user.can_create_new_alias():
if notify_user:
send_cannot_create_domain_alias(custom_domain.user, address, alias_domain)
return None
if not custom_domain.catch_all:
if len(custom_domain.auto_create_rules) == 0:
return None
local = get_email_local_part(address)
for rule in custom_domain.auto_create_rules:
if regex_match(rule.regex, local):
LOG.d(
"%s passes %s on %s",
address,
rule.regex,
custom_domain,
)
return custom_domain, rule
else: # no rule passes
LOG.d("no rule passed to create %s", local)
return None
LOG.d("Create alias via catchall")
return custom_domain, None
def check_if_alias_can_be_auto_created_for_a_directory(
address: str, notify_user: bool = True
) -> Optional[Directory]:
"""
Try to create an alias with directory
"""
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
if not can_create_directory_for_address(address):
return None
# alias contains one of the 3 special directory separator: "/", "+" or "#"
if "/" in address:
sep = "/"
elif "+" in address:
sep = "+"
elif "#" in address:
sep = "#"
else:
# if there's no directory separator in the alias, no way to auto-create it
return None
directory_name = address[: address.find(sep)]
LOG.d("directory_name %s", directory_name)
directory = Directory.get_by(name=directory_name)
if not directory:
return None
user: User = directory.user
if user.disabled:
LOG.i("Disabled %s can't create new alias with directory", user)
return None
if not user.can_create_new_alias():
if notify_user:
send_cannot_create_directory_alias(user, address, directory_name)
return None
if directory.disabled:
if notify_user:
send_cannot_create_directory_alias_disabled(user, address, directory_name)
return None
return directory
def try_auto_create(address: str) -> Optional[Alias]:
"""Try to auto-create the alias using directory or catch-all domain"""
# VERP for reply phase is {BOUNCE_PREFIX_FOR_REPLY_PHASE}+{email_log.id}+@{alias_domain}
@ -60,124 +186,72 @@ def try_auto_create_directory(address: str) -> Optional[Alias]:
"""
Try to create an alias with directory
"""
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
if can_create_directory_for_address(address):
# if there's no directory separator in the alias, no way to auto-create it
if "/" not in address and "+" not in address and "#" not in address:
return None
directory = check_if_alias_can_be_auto_created_for_a_directory(
address, notify_user=True
)
if not directory:
return None
# alias contains one of the 3 special directory separator: "/", "+" or "#"
if "/" in address:
sep = "/"
elif "+" in address:
sep = "+"
else:
sep = "#"
try:
LOG.d("create alias %s for directory %s", address, directory)
directory_name = address[: address.find(sep)]
LOG.d("directory_name %s", directory_name)
mailboxes = directory.mailboxes
directory = Directory.get_by(name=directory_name)
if not directory:
return None
user: User = directory.user
if user.disabled:
LOG.i("Disabled %s can't create new alias with directory", user)
return None
if not user.can_create_new_alias():
send_cannot_create_directory_alias(user, address, directory_name)
return None
if directory.disabled:
send_cannot_create_directory_alias_disabled(user, address, directory_name)
return None
try:
LOG.d("create alias %s for directory %s", address, directory)
mailboxes = directory.mailboxes
alias = Alias.create(
email=address,
user_id=directory.user_id,
directory_id=directory.id,
mailbox_id=mailboxes[0].id,
alias = Alias.create(
email=address,
user_id=directory.user_id,
directory_id=directory.id,
mailbox_id=mailboxes[0].id,
)
if not directory.user.disable_automatic_alias_note:
alias.note = f"Created by directory {directory.name}"
Session.flush()
for i in range(1, len(mailboxes)):
AliasMailbox.create(
alias_id=alias.id,
mailbox_id=mailboxes[i].id,
)
if not user.disable_automatic_alias_note:
alias.note = f"Created by directory {directory.name}"
Session.flush()
for i in range(1, len(mailboxes)):
AliasMailbox.create(
alias_id=alias.id,
mailbox_id=mailboxes[i].id,
)
Session.commit()
return alias
except AliasInTrashError:
LOG.w(
"Alias %s was deleted before, cannot auto-create using directory %s, user %s",
address,
directory_name,
user,
)
return None
except IntegrityError:
LOG.w("Alias %s already exists", address)
Session.rollback()
alias = Alias.get_by(email=address)
return alias
Session.commit()
return alias
except AliasInTrashError:
LOG.w(
"Alias %s was deleted before, cannot auto-create using directory %s, user %s",
address,
directory.name,
directory.user,
)
return None
except IntegrityError:
LOG.w("Alias %s already exists", address)
Session.rollback()
alias = Alias.get_by(email=address)
return alias
def try_auto_create_via_domain(address: str) -> Optional[Alias]:
"""Try to create an alias with catch-all or auto-create rules on custom domain"""
# try to create alias on-the-fly with custom-domain catch-all feature
# check if alias is custom-domain alias and if the custom-domain has catch-all enabled
alias_domain = get_email_domain_part(address)
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
if not custom_domain:
can_create = check_if_alias_can_be_auto_created_for_custom_domain(address)
if not can_create:
return None
custom_domain, rule = can_create
domain_user: User = custom_domain.user
if domain_user.disabled:
LOG.i("Disabled user %s can't create new alias via custom domain", domain_user)
return None
if not custom_domain.catch_all and len(custom_domain.auto_create_rules) == 0:
return None
elif not custom_domain.catch_all and len(custom_domain.auto_create_rules) > 0:
local = get_email_local_part(address)
for rule in custom_domain.auto_create_rules:
if regex_match(rule.regex, local):
LOG.d(
"%s passes %s on %s",
address,
rule.regex,
custom_domain,
)
alias_note = f"Created by rule {rule.order} with regex {rule.regex}"
mailboxes = rule.mailboxes
break
else: # no rule passes
LOG.d("no rule passed to create %s", local)
return
else: # catch-all is enabled
if rule:
alias_note = f"Created by rule {rule.order} with regex {rule.regex}"
mailboxes = rule.mailboxes
else:
alias_note = "Created by catchall option"
mailboxes = custom_domain.mailboxes
alias_note = "Created by catch-all option"
if not domain_user.can_create_new_alias():
send_cannot_create_domain_alias(domain_user, address, alias_domain)
return None
# a rule can have 0 mailboxes. Happened when a mailbox is deleted
if not mailboxes:
LOG.d("use %s default mailbox for %s %s", domain_user, address, custom_domain)
mailboxes = [domain_user.default_mailbox]
LOG.d(
"use %s default mailbox for %s %s",
custom_domain.user,
address,
custom_domain,
)
mailboxes = [custom_domain.user.default_mailbox]
try:
LOG.d("create alias %s for domain %s", address, custom_domain)
@ -203,7 +277,7 @@ def try_auto_create_via_domain(address: str) -> Optional[Alias]:
"Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s",
address,
custom_domain,
domain_user,
custom_domain.user,
)
return None
except IntegrityError:

View file

@ -5,14 +5,14 @@ from email.message import Message
import aiospamc
from app.config import SPAMASSASSIN_HOST
from app.email_utils import to_bytes
from app.log import LOG
from app.message_utils import message_to_bytes
from app.models import EmailLog
from app.spamassassin_utils import SpamAssassin
async def get_spam_score_async(message: Message) -> float:
sa_input = to_bytes(message)
sa_input = message_to_bytes(message)
# Spamassassin requires to have an ending linebreak
if not sa_input.endswith(b"\n"):
@ -41,7 +41,7 @@ def get_spam_score(
Return the spam score and spam report
"""
LOG.d("get spam score for %s", email_log)
sa_input = to_bytes(message)
sa_input = message_to_bytes(message)
# Spamassassin requires to have an ending linebreak
if not sa_input.endswith(b"\n"):

View file

@ -14,12 +14,11 @@ from email.message import Message, EmailMessage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import make_msgid, formatdate
from smtplib import SMTP, SMTPServerDisconnected, SMTPException, SMTPRecipientsRefused
from smtplib import SMTP, SMTPException
from typing import Tuple, List, Optional, Union
import arrow
import dkim
import newrelic.agent
import re2 as re
import spf
from aiosmtpd.smtp import Envelope
@ -63,6 +62,8 @@ from app.db import Session
from app.dns_utils import get_mx_domains
from app.email import headers
from app.log import LOG
from app.mail_sender import sl_sendmail
from app.message_utils import message_to_bytes
from app.models import (
Mailbox,
User,
@ -477,7 +478,7 @@ def add_dkim_signature_with_header(
# Generate message signature
if DKIM_PRIVATE_KEY:
sig = dkim.sign(
to_bytes(msg),
message_to_bytes(msg),
DKIM_SELECTOR,
email_domain.encode(),
DKIM_PRIVATE_KEY.encode(),
@ -836,7 +837,7 @@ def copy(msg: Message) -> Message:
return message_from_string(msg.as_string())
except (UnicodeEncodeError, LookupError):
LOG.w("as_string() fails, try bytes parsing")
return message_from_bytes(to_bytes(msg))
return message_from_bytes(message_to_bytes(msg))
def to_bytes(msg: Message):
@ -1307,82 +1308,6 @@ def get_smtp_server():
return smtp
def sl_sendmail(
from_addr,
to_addr,
msg: Message,
mail_options=(),
rcpt_options=(),
is_forward: bool = False,
retries=2,
ignore_smtp_error=False,
):
"""replace smtp.sendmail"""
if NOT_SEND_EMAIL:
LOG.d(
"send email with subject '%s', from '%s' to '%s'",
msg[headers.SUBJECT],
msg[headers.FROM],
msg[headers.TO],
)
return
try:
start = time.time()
if POSTFIX_SUBMISSION_TLS:
smtp_port = 587
else:
smtp_port = POSTFIX_PORT
with SMTP(POSTFIX_SERVER, smtp_port) as smtp:
if POSTFIX_SUBMISSION_TLS:
smtp.starttls()
elapsed = time.time() - start
LOG.d("getting a smtp connection takes seconds %s", elapsed)
newrelic.agent.record_custom_metric("Custom/smtp_connection_time", elapsed)
# smtp.send_message has UnicodeEncodeError
# encode message raw directly instead
LOG.d(
"Sendmail mail_from:%s, rcpt_to:%s, header_from:%s, header_to:%s, header_cc:%s",
from_addr,
to_addr,
msg[headers.FROM],
msg[headers.TO],
msg[headers.CC],
)
smtp.sendmail(
from_addr,
to_addr,
to_bytes(msg),
mail_options,
rcpt_options,
)
except (SMTPServerDisconnected, SMTPRecipientsRefused) as e:
if retries > 0:
LOG.w(
"SMTPServerDisconnected or SMTPRecipientsRefused error %s, retry",
e,
exc_info=True,
)
time.sleep(0.3 * retries)
sl_sendmail(
from_addr,
to_addr,
msg,
mail_options,
rcpt_options,
is_forward,
retries=retries - 1,
)
else:
if ignore_smtp_error:
LOG.w("Ignore smtp error %s", e)
else:
raise
def get_queue_id(msg: Message) -> Optional[str]:
"""Get the Postfix queue-id from a message"""
header_values = msg.get_all(headers.RSPAMD_QUEUE_ID)

View file

@ -17,11 +17,11 @@ from app.email_utils import (
send_email_with_rate_control,
render,
add_or_replace_header,
to_bytes,
add_header,
)
from app.handler.spamd_result import SpamdResult, Phase, DmarcCheckResult
from app.log import LOG
from app.message_utils import message_to_bytes
from app.models import Alias, Contact, Notification, EmailLog, RefusedEmail
@ -102,7 +102,7 @@ def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> Emai
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
s3_report_path, BytesIO(message_to_bytes(msg)), f"full-{random_name}"
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True

131
app/mail_sender.py Normal file
View file

@ -0,0 +1,131 @@
import time
from concurrent.futures import ThreadPoolExecutor
from mailbox import Message
from smtplib import SMTP, SMTPServerDisconnected, SMTPRecipientsRefused
from typing import Optional, Dict
import newrelic
from attr import dataclass
from app.config import (
NOT_SEND_EMAIL,
POSTFIX_SUBMISSION_TLS,
POSTFIX_PORT,
POSTFIX_SERVER,
)
from app.email import headers
from app.log import LOG
from app.message_utils import message_to_bytes
@dataclass
class SendRequest:
from_address: str
to_address: str
msg: Message
mail_options: Dict = {}
rcpt_options: Dict = {}
is_forward: bool = False
ignore_smtp_errors: bool = False
class MailSender:
def __init__(self):
self._pool: Optional[ThreadPoolExecutor] = None
def enable_background_pool(self, max_workers=10):
self._pool = ThreadPoolExecutor(max_workers=max_workers)
def send(self, send_request: SendRequest, retries: int = 2):
"""replace smtp.sendmail"""
if NOT_SEND_EMAIL:
LOG.d(
"send email with subject '%s', from '%s' to '%s'",
send_request.msg[headers.SUBJECT],
send_request.msg[headers.FROM],
send_request.msg[headers.TO],
)
return
if not self._pool:
self._send_to_smtp(send_request, retries)
else:
self._pool.submit(self._send_to_smtp, (send_request, retries))
def _send_to_smtp(self, send_request: SendRequest, retries: int):
try:
start = time.time()
if POSTFIX_SUBMISSION_TLS:
smtp_port = 587
else:
smtp_port = POSTFIX_PORT
with SMTP(POSTFIX_SERVER, smtp_port) as smtp:
if POSTFIX_SUBMISSION_TLS:
smtp.starttls()
elapsed = time.time() - start
LOG.d("getting a smtp connection takes seconds %s", elapsed)
newrelic.agent.record_custom_metric(
"Custom/smtp_connection_time", elapsed
)
# smtp.send_message has UnicodeEncodeError
# encode message raw directly instead
LOG.d(
"Sendmail mail_from:%s, rcpt_to:%s, header_from:%s, header_to:%s, header_cc:%s",
send_request.from_address,
send_request.to_address,
send_request.msg[headers.FROM],
send_request.msg[headers.TO],
send_request.msg[headers.CC],
)
smtp.sendmail(
send_request.from_address,
send_request.to_address,
message_to_bytes(send_request.msg),
send_request.mail_options,
send_request.rcpt_options,
)
newrelic.agent.record_custom_metric(
"Custom/smtp_sending_time", time.time() - start
)
except (SMTPServerDisconnected, SMTPRecipientsRefused) as e:
if retries > 0:
LOG.w(
"SMTPServerDisconnected or SMTPRecipientsRefused error %s, retry",
e,
exc_info=True,
)
time.sleep(0.3 * send_request.retries)
self._send_to_smtp(send_request, retries - 1)
else:
if send_request.ignore_smtp_error:
LOG.w("Ignore smtp error %s", e)
else:
raise
mail_sender = MailSender()
def sl_sendmail(
from_address: str,
to_address: str,
msg: Message,
mail_options=(),
rcpt_options=(),
is_forward: bool = False,
retries=2,
ignore_smtp_error=False,
):
send_request = SendRequest(
from_address,
to_address,
msg,
mail_options,
rcpt_options,
is_forward,
ignore_smtp_error,
)
mail_sender.send(send_request, retries)

21
app/message_utils.py Normal file
View file

@ -0,0 +1,21 @@
from email import policy
from email.message import Message
from app.log import LOG
def message_to_bytes(msg: Message) -> bytes:
"""replace Message.as_bytes() method by trying different policies"""
for generator_policy in [None, policy.SMTP, policy.SMTPUTF8]:
try:
return msg.as_bytes(policy=generator_policy)
except:
LOG.w("as_bytes() fails with %s policy", policy, exc_info=True)
msg_string = msg.as_string()
try:
return msg_string.encode()
except:
LOG.w("as_string().encode() fails", exc_info=True)
return msg_string.encode(errors="replace")

View file

@ -104,7 +104,6 @@ from app.email_utils import (
send_email_with_rate_control,
get_email_domain_part,
copy,
to_bytes,
send_email_at_most_times,
is_valid_alias_address_domain,
should_add_dkim_signature,
@ -118,7 +117,6 @@ from app.email_utils import (
should_disable,
parse_id_from_bounce,
spf_pass,
sl_sendmail,
sanitize_header,
get_queue_id,
should_ignore_bounce,
@ -147,6 +145,8 @@ from app.handler.spamd_result import (
SPFCheckResult,
)
from app.log import LOG, set_message_id
from app.mail_sender import sl_sendmail
from app.message_utils import message_to_bytes
from app.models import (
Alias,
Contact,
@ -501,7 +501,7 @@ def prepare_pgp_message(
# encrypt
# use pgpy as fallback
msg_bytes = to_bytes(clone_msg)
msg_bytes = message_to_bytes(clone_msg)
try:
encrypted_data = pgp_utils.encrypt_file(BytesIO(msg_bytes), pgp_fingerprint)
second.set_payload(encrypted_data)
@ -527,11 +527,11 @@ def sign_msg(msg: Message) -> Message:
signature.add_header("Content-Disposition", 'attachment; filename="signature.asc"')
try:
signature.set_payload(sign_data(to_bytes(msg).replace(b"\n", b"\r\n")))
signature.set_payload(sign_data(message_to_bytes(msg).replace(b"\n", b"\r\n")))
except Exception:
LOG.e("Cannot sign, try using pgpy")
signature.set_payload(
sign_data_with_pgpy(to_bytes(msg).replace(b"\n", b"\r\n"))
sign_data_with_pgpy(message_to_bytes(msg).replace(b"\n", b"\r\n"))
)
container.attach(signature)
@ -543,7 +543,9 @@ def handle_email_sent_to_ourself(alias, from_addr: str, msg: Message, user):
# store the refused email
random_name = str(uuid.uuid4())
full_report_path = f"refused-emails/cycle-{random_name}.eml"
s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name)
s3.upload_email_from_bytesio(
full_report_path, BytesIO(message_to_bytes(msg)), random_name
)
refused_email = RefusedEmail.create(
path=None, full_report_path=full_report_path, user_id=alias.user_id
)
@ -1394,7 +1396,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog):
full_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
full_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
full_report_path, BytesIO(message_to_bytes(msg)), f"full-{random_name}"
)
file_path = None
@ -1413,7 +1415,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog):
else:
file_path = f"refused-emails/{random_name}.eml"
s3.upload_email_from_bytesio(
file_path, BytesIO(to_bytes(orig_msg)), random_name
file_path, BytesIO(message_to_bytes(orig_msg)), random_name
)
refused_email = RefusedEmail.create(
@ -1733,14 +1735,16 @@ def handle_bounce_reply_phase(envelope, msg: Message, email_log: EmailLog):
random_name = str(uuid.uuid4())
full_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name)
s3.upload_email_from_bytesio(
full_report_path, BytesIO(message_to_bytes(msg)), random_name
)
orig_msg = get_orig_message_from_bounce(msg)
file_path = None
if orig_msg:
file_path = f"refused-emails/{random_name}.eml"
s3.upload_email_from_bytesio(
file_path, BytesIO(to_bytes(orig_msg)), random_name
file_path, BytesIO(message_to_bytes(orig_msg)), random_name
)
refused_email = RefusedEmail.create(
@ -1809,13 +1813,15 @@ def handle_spam(
random_name = str(uuid.uuid4())
full_report_path = f"spams/full-{random_name}.eml"
s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name)
s3.upload_email_from_bytesio(
full_report_path, BytesIO(message_to_bytes(msg)), random_name
)
file_path = None
if orig_msg:
file_path = f"spams/{random_name}.eml"
s3.upload_email_from_bytesio(
file_path, BytesIO(to_bytes(orig_msg)), random_name
file_path, BytesIO(message_to_bytes(orig_msg)), random_name
)
refused_email = RefusedEmail.create(

View file

@ -1,7 +1,23 @@
from app.alias_utils import delete_alias, check_alias_prefix
from typing import List
from app.alias_utils import (
delete_alias,
check_alias_prefix,
get_user_if_alias_would_auto_create,
try_auto_create,
)
from app.config import ALIAS_DOMAINS
from app.db import Session
from app.models import Alias, DeletedAlias
from tests.utils import create_new_user
from app.models import (
Alias,
DeletedAlias,
CustomDomain,
AutoCreateRule,
Directory,
DirectoryMailbox,
User,
)
from tests.utils import create_new_user, random_domain, random_token
def test_delete_alias(flask_client):
@ -44,3 +60,75 @@ def test_check_alias_prefix(flask_client):
assert not check_alias_prefix("a b")
assert not check_alias_prefix("+👌")
assert not check_alias_prefix("too-long" * 10)
def get_auto_create_alias_tests(user: User) -> List:
user.lifetime = True
catchall = CustomDomain.create(
user_id=user.id,
catch_all=True,
domain=random_domain(),
verified=True,
flush=True,
)
no_catchall = CustomDomain.create(
user_id=user.id,
catch_all=False,
domain=random_domain(),
verified=True,
flush=True,
)
no_catchall_with_rule = CustomDomain.create(
user_id=user.id,
catch_all=False,
domain=random_domain(),
verified=True,
flush=True,
)
AutoCreateRule.create(
custom_domain_id=no_catchall_with_rule.id,
order=0,
regex="ok-.*",
flush=True,
)
dir_name = random_token()
directory = Directory.create(name=dir_name, user_id=user.id, flush=True)
DirectoryMailbox.create(
directory_id=directory.id, mailbox_id=user.default_mailbox_id, flush=True
)
Session.commit()
return [
(f"nonexistant@{catchall.domain}", True),
(f"nonexistant@{no_catchall.domain}", False),
(f"nonexistant@{no_catchall_with_rule.domain}", False),
(f"ok-nonexistant@{no_catchall_with_rule.domain}", True),
(f"{dir_name}+something@nowhere.net", False),
(f"{dir_name}#something@nowhere.net", False),
(f"{dir_name}/something@nowhere.net", False),
(f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True),
]
def test_get_user_if_alias_would_auto_create(flask_client):
user = create_new_user()
for test_id, (address, expected_ok) in enumerate(get_auto_create_alias_tests(user)):
result = get_user_if_alias_would_auto_create(address)
if expected_ok:
assert (
isinstance(result, User) and result.id == user.id
), f"Case {test_id} - Failed address {address}"
else:
assert not result, f"Case {test_id} - Failed address {address}"
def test_auto_create_alias(flask_client):
user = create_new_user()
for test_id, (address, expected_ok) in enumerate(get_auto_create_alias_tests(user)):
result = try_auto_create(address)
if expected_ok:
assert result, f"Case {test_id} - Failed address {address}"
else:
assert result is None, f"Case {test_id} - Failed address {address}"

View file

@ -19,7 +19,6 @@ from app.email_utils import (
get_header_from_bounce,
is_valid_email,
add_header,
to_bytes,
generate_reply_email,
normalize_reply_email,
get_encoding,
@ -161,23 +160,6 @@ def test_send_email_with_rate_control(flask_client):
)
def test_copy():
email_str = """
From: abcd@gmail.com
To: hey@example.org
Subject: subject
Body
"""
msg = email.message_from_string(email_str)
msg2 = copy(msg)
assert to_bytes(msg) == to_bytes(msg2)
msg = email.message_from_string("👌")
msg2 = copy(msg)
assert to_bytes(msg) == to_bytes(msg2)
def test_get_spam_from_header():
is_spam, _ = get_spam_from_header(
"""No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
@ -476,19 +458,6 @@ Content-Type: text/html; charset=us-ascii
assert "old" not in new_msg.as_string()
def test_to_bytes():
msg = email.message_from_string("☕️ emoji")
assert to_bytes(msg)
# \n is appended when message is converted to bytes
assert to_bytes(msg).decode() == "\n☕️ emoji"
msg = email.message_from_string("ascii")
assert to_bytes(msg) == b"\nascii"
msg = email.message_from_string("éèà€")
assert to_bytes(msg).decode() == "\néèà€"
def test_generate_reply_email(flask_client):
user = create_new_user()
reply_email = generate_reply_email("test@example.org", user)

View file

@ -0,0 +1,35 @@
import email
from app.email_utils import (
copy,
)
from app.message_utils import message_to_bytes
def test_copy():
email_str = """
From: abcd@gmail.com
To: hey@example.org
Subject: subject
Body
"""
msg = email.message_from_string(email_str)
msg2 = copy(msg)
assert message_to_bytes(msg) == message_to_bytes(msg2)
msg = email.message_from_string("👌")
msg2 = copy(msg)
assert message_to_bytes(msg) == message_to_bytes(msg2)
def test_to_bytes():
msg = email.message_from_string("☕️ emoji")
assert message_to_bytes(msg)
# \n is appended when message is converted to bytes
assert message_to_bytes(msg).decode() == "\n☕️ emoji"
msg = email.message_from_string("ascii")
assert message_to_bytes(msg) == b"\nascii"
msg = email.message_from_string("éèà€")
assert message_to_bytes(msg).decode() == "\néèà€"