Merge pull request #872 from simple-login/ac-dmarc-reply-phase

Apply dmarc policy to the reply phase
This commit is contained in:
Son Nguyen Kim 2022-04-12 18:25:32 +02:00 committed by GitHub
commit 80a45b4b07
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 458 additions and 263 deletions

View file

@ -304,6 +304,9 @@ MAX_ALERT_24H = 4
# When a reverse-alias receives emails from un unknown mailbox # When a reverse-alias receives emails from un unknown mailbox
ALERT_REVERSE_ALIAS_UNKNOWN_MAILBOX = "reverse_alias_unknown_mailbox" ALERT_REVERSE_ALIAS_UNKNOWN_MAILBOX = "reverse_alias_unknown_mailbox"
# When somebody is trying to spoof a reply
ALERT_DMARC_FAILED_REPLY_PHASE = "dmarc_failed_reply_phase"
# When a forwarding email is bounced # When a forwarding email is bounced
ALERT_BOUNCE_EMAIL = "bounce" ALERT_BOUNCE_EMAIL = "bounce"

View file

@ -73,9 +73,6 @@ from app.models import (
TransactionalEmail, TransactionalEmail,
IgnoreBounceSender, IgnoreBounceSender,
InvalidMailboxDomain, InvalidMailboxDomain,
DmarcCheckResult,
SpamdResult,
SPFCheckResult,
) )
from app.utils import ( from app.utils import (
random_string, random_string,
@ -1465,28 +1462,3 @@ def save_envelope_for_debugging(envelope: Envelope, file_name_prefix=None) -> st
return file_name return file_name
return "" return ""
def get_spamd_result(msg: Message) -> Optional[SpamdResult]:
spam_result_header = msg.get_all(headers.SPAMD_RESULT)
if not spam_result_header:
newrelic.agent.record_custom_event("SpamdCheck", {"header": "missing"})
return None
spam_entries = [entry.strip() for entry in str(spam_result_header[-1]).split("\n")]
for entry_pos in range(len(spam_entries)):
sep = spam_entries[entry_pos].find("(")
if sep > -1:
spam_entries[entry_pos] = spam_entries[entry_pos][:sep]
spamd_result = SpamdResult()
for header_value, dmarc_result in DmarcCheckResult.get_string_dict().items():
if header_value in spam_entries:
spamd_result.set_dmarc_result(dmarc_result)
for header_value, spf_result in SPFCheckResult.get_string_dict().items():
if header_value in spam_entries:
spamd_result.set_spf_result(spf_result)
newrelic.agent.record_custom_event("SpamdCheck", spamd_result.event_data())
return spamd_result

View file

@ -56,8 +56,3 @@ class MailSentFromReverseAlias(SLException):
"""raised when receiving an email sent from a reverse alias""" """raised when receiving an email sent from a reverse alias"""
pass pass
class DmarcSoftFail(SLException):
pass

155
app/handler/dmarc.py Normal file
View file

@ -0,0 +1,155 @@
import uuid
from io import BytesIO
from typing import Optional, Tuple
from aiosmtpd.handlers import Message
from aiosmtpd.smtp import Envelope
from app import s3
from app.config import (
DMARC_CHECK_ENABLED,
ALERT_QUARANTINE_DMARC,
ALERT_DMARC_FAILED_REPLY_PHASE,
)
from app.email import headers, status
from app.email_utils import (
get_header_unicode,
send_email_with_rate_control,
render,
add_or_replace_header,
to_bytes,
add_header,
)
from app.handler.spamd_result import SpamdResult, Phase, DmarcCheckResult
from app.log import LOG
from app.models import Alias, Contact, Notification, EmailLog, RefusedEmail
def apply_dmarc_policy_for_forward_phase(
alias: Alias, contact: Contact, envelope: Envelope, msg: Message
) -> Tuple[Message, Optional[str]]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.forward)
if not DMARC_CHECK_ENABLED or not spam_result:
return msg, None
from_header = get_header_unicode(msg[headers.FROM])
if spam_result.dmarc == DmarcCheckResult.soft_fail:
LOG.w(
f"dmarc forward: soft_fail from contact {contact.email} to alias {alias.email}."
f"mail_from:{envelope.mail_from}, from_header: {from_header}"
)
changed_msg = add_header(
msg,
f"""This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.""",
f"""
<p style="color:red">
This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.
</p>
""",
)
return changed_msg, None
if spam_result.dmarc in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
):
LOG.w(
f"dmarc forward: put email from {contact} to {alias} to quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
email_log = quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg)
Notification.create(
user_id=alias.user_id,
title=f"{alias.email} has a new mail in quarantine",
message=Notification.render(
"notification/message-quarantine.html", alias=alias
),
commit=True,
)
user = alias.user
send_email_with_rate_control(
user,
ALERT_QUARANTINE_DMARC,
user.email,
f"An email sent to {alias.email} has been quarantined",
render(
"transactional/message-quarantine-dmarc.txt.jinja2",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
render(
"transactional/message-quarantine-dmarc.html",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
max_nb_alert=10,
ignore_smtp_error=True,
)
return msg, status.E215
return msg, None
def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> EmailLog:
add_or_replace_header(msg, headers.SL_DIRECTION, "Forward")
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
return EmailLog.create(
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
alias_id=alias.id,
message_id=str(msg[headers.MESSAGE_ID]),
refused_email_id=refused_email.id,
is_spam=True,
blocked=True,
commit=True,
)
def apply_dmarc_policy_for_reply_phase(
alias_from: Alias, contact_recipient: Contact, envelope: Envelope, msg: Message
) -> Optional[str]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.reply)
if not DMARC_CHECK_ENABLED or not spam_result:
return None
if spam_result.dmarc not in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
DmarcCheckResult.soft_fail,
):
return None
LOG.w(
f"dmarc reply: Put email from {alias_from.email} to {contact_recipient} into quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
send_email_with_rate_control(
alias_from.user,
ALERT_DMARC_FAILED_REPLY_PHASE,
alias_from.user.email,
f"Attempt to send an email to your contact {contact_recipient.email} from {envelope.mail_from}",
render(
"transactional/spoof-reply.txt",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
render(
"transactional/spoof-reply.html",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
)
return status.E215

127
app/handler/spamd_result.py Normal file
View file

@ -0,0 +1,127 @@
from __future__ import annotations
from typing import Dict, Optional
import newrelic
from app.email import headers
from app.models import EnumE
from email.message import Message
class Phase(EnumE):
unknown = 0
forward = 1
reply = 2
class DmarcCheckResult(EnumE):
allow = 0
soft_fail = 1
quarantine = 2
reject = 3
not_available = 4
bad_policy = 5
@staticmethod
def get_string_dict():
return {
"DMARC_POLICY_ALLOW": DmarcCheckResult.allow,
"DMARC_POLICY_SOFTFAIL": DmarcCheckResult.soft_fail,
"DMARC_POLICY_QUARANTINE": DmarcCheckResult.quarantine,
"DMARC_POLICY_REJECT": DmarcCheckResult.reject,
"DMARC_NA": DmarcCheckResult.not_available,
"DMARC_BAD_POLICY": DmarcCheckResult.bad_policy,
}
class SPFCheckResult(EnumE):
allow = 0
fail = 1
soft_fail = 1
neutral = 2
temp_error = 3
not_available = 4
perm_error = 5
@staticmethod
def get_string_dict():
return {
"R_SPF_ALLOW": SPFCheckResult.allow,
"R_SPF_FAIL": SPFCheckResult.fail,
"R_SPF_SOFTFAIL": SPFCheckResult.soft_fail,
"R_SPF_NEUTRAL": SPFCheckResult.neutral,
"R_SPF_DNSFAIL": SPFCheckResult.temp_error,
"R_SPF_NA": SPFCheckResult.not_available,
"R_SPF_PERMFAIL": SPFCheckResult.perm_error,
}
class SpamdResult:
def __init__(self, phase: Phase = Phase.unknown):
self.phase: Phase = phase
self.dmarc: DmarcCheckResult = DmarcCheckResult.not_available
self.spf: SPFCheckResult = SPFCheckResult.not_available
def set_dmarc_result(self, dmarc_result: DmarcCheckResult):
self.dmarc = dmarc_result
def set_spf_result(self, spf_result: SPFCheckResult):
self.spf = spf_result
def event_data(self) -> Dict:
return {
"header": "present",
"dmarc": self.dmarc.name,
"spf": self.spf.name,
"phase": self.phase.name,
}
@classmethod
def extract_from_headers(
cls, msg: Message, phase: Phase = Phase.unknown
) -> Optional[SpamdResult]:
cached = cls._get_from_message(msg)
if cached:
return cached
spam_result_header = msg.get_all(headers.SPAMD_RESULT)
if not spam_result_header:
return None
spam_entries = [
entry.strip() for entry in str(spam_result_header[-1]).split("\n")
]
for entry_pos in range(len(spam_entries)):
sep = spam_entries[entry_pos].find("(")
if sep > -1:
spam_entries[entry_pos] = spam_entries[entry_pos][:sep]
spamd_result = SpamdResult(phase)
for header_value, dmarc_result in DmarcCheckResult.get_string_dict().items():
if header_value in spam_entries:
spamd_result.set_dmarc_result(dmarc_result)
break
for header_value, spf_result in SPFCheckResult.get_string_dict().items():
if header_value in spam_entries:
spamd_result.set_spf_result(spf_result)
break
cls._store_in_message(spamd_result, msg)
return spamd_result
@classmethod
def _store_in_message(cls, check: SpamdResult, msg: Message):
msg.spamd_check = check
@classmethod
def _get_from_message(cls, msg: Message) -> Optional[SpamdResult]:
return getattr(msg, "spamd_check", None)
@classmethod
def send_to_new_relic(cls, msg: Message):
check = cls._get_from_message(msg)
if check:
newrelic.agent.record_custom_event("SpamdCheck", check.event_data())
else:
newrelic.agent.record_custom_event("SpamdCheck", {"header": "missing"})

View file

@ -3,7 +3,7 @@ import os
import random import random
import uuid import uuid
from email.utils import formataddr from email.utils import formataddr
from typing import List, Tuple, Optional, Dict from typing import List, Tuple, Optional
import arrow import arrow
import sqlalchemy as sa import sqlalchemy as sa
@ -237,63 +237,6 @@ class AuditLogActionEnum(EnumE):
extend_subscription = 7 extend_subscription = 7
class DmarcCheckResult(EnumE):
allow = 0
soft_fail = 1
quarantine = 2
reject = 3
not_available = 4
bad_policy = 5
@staticmethod
def get_string_dict():
return {
"DMARC_POLICY_ALLOW": DmarcCheckResult.allow,
"DMARC_POLICY_SOFTFAIL": DmarcCheckResult.soft_fail,
"DMARC_POLICY_QUARANTINE": DmarcCheckResult.quarantine,
"DMARC_POLICY_REJECT": DmarcCheckResult.reject,
"DMARC_NA": DmarcCheckResult.not_available,
"DMARC_BAD_POLICY": DmarcCheckResult.bad_policy,
}
class SPFCheckResult(EnumE):
allow = 0
fail = 1
soft_fail = 1
neutral = 2
temp_error = 3
not_available = 4
perm_error = 5
@staticmethod
def get_string_dict():
return {
"R_SPF_ALLOW": SPFCheckResult.allow,
"R_SPF_FAIL": SPFCheckResult.fail,
"R_SPF_SOFTFAIL": SPFCheckResult.soft_fail,
"R_SPF_NEUTRAL": SPFCheckResult.neutral,
"R_SPF_DNSFAIL": SPFCheckResult.temp_error,
"R_SPF_NA": SPFCheckResult.not_available,
"R_SPF_PERMFAIL": SPFCheckResult.perm_error,
}
class SpamdResult:
def __init__(self):
self.dmarc: DmarcCheckResult = DmarcCheckResult.not_available
self.spf: SPFCheckResult = SPFCheckResult.not_available
def set_dmarc_result(self, dmarc_result: DmarcCheckResult):
self.dmarc = dmarc_result
def set_spf_result(self, spf_result: SPFCheckResult):
self.spf = spf_result
def event_data(self) -> Dict:
return {"header": "present", "dmarc": self.dmarc, "spf": self.spf}
class Hibp(Base, ModelMixin): class Hibp(Base, ModelMixin):
__tablename__ = "hibp" __tablename__ = "hibp"
name = sa.Column(sa.String(), nullable=False, unique=True, index=True) name = sa.Column(sa.String(), nullable=False, unique=True, index=True)

View file

@ -87,10 +87,16 @@ from app.config import (
OLD_UNSUBSCRIBER, OLD_UNSUBSCRIBER,
ALERT_FROM_ADDRESS_IS_REVERSE_ALIAS, ALERT_FROM_ADDRESS_IS_REVERSE_ALIAS,
ALERT_TO_NOREPLY, ALERT_TO_NOREPLY,
DMARC_CHECK_ENABLED,
ALERT_QUARANTINE_DMARC,
) )
from app.db import Session from app.db import Session
from app.handler.dmarc import (
apply_dmarc_policy_for_reply_phase,
apply_dmarc_policy_for_forward_phase,
)
from app.handler.spamd_result import (
SpamdResult,
SPFCheckResult,
)
from app.email import status, headers from app.email import status, headers
from app.email.rate_limit import rate_limited from app.email.rate_limit import rate_limited
from app.email.spam import get_spam_score from app.email.spam import get_spam_score
@ -130,7 +136,6 @@ from app.email_utils import (
get_orig_message_from_yahoo_complaint, get_orig_message_from_yahoo_complaint,
get_mailbox_bounce_info, get_mailbox_bounce_info,
save_email_for_debugging, save_email_for_debugging,
get_spamd_result,
save_envelope_for_debugging, save_envelope_for_debugging,
) )
from app.errors import ( from app.errors import (
@ -139,7 +144,6 @@ from app.errors import (
VERPForward, VERPForward,
VERPReply, VERPReply,
CannotCreateContactForReverseAlias, CannotCreateContactForReverseAlias,
DmarcSoftFail,
) )
from app.log import LOG, set_message_id from app.log import LOG, set_message_id
from app.models import ( from app.models import (
@ -157,8 +161,6 @@ from app.models import (
DeletedAlias, DeletedAlias,
DomainDeletedAlias, DomainDeletedAlias,
Notification, Notification,
DmarcCheckResult,
SPFCheckResult,
) )
from app.pgp_utils import PGPException, sign_data_with_pgpy, sign_data from app.pgp_utils import PGPException, sign_data_with_pgpy, sign_data
from app.utils import sanitize_email from app.utils import sanitize_email
@ -543,99 +545,6 @@ def handle_email_sent_to_ourself(alias, from_addr: str, msg: Message, user):
) )
def apply_dmarc_policy(
alias: Alias, contact: Contact, envelope: Envelope, msg: Message
) -> Optional[str]:
spam_result = get_spamd_result(msg)
if not DMARC_CHECK_ENABLED or not spam_result:
return None
from_header = get_header_unicode(msg[headers.FROM])
if spam_result.dmarc == DmarcCheckResult.soft_fail:
LOG.w(
f"dmarc soft_fail from contact {contact.email} to alias {alias.email}."
f"mail_from:{envelope.mail_from}, from_header: {from_header}"
)
raise DmarcSoftFail
if spam_result.dmarc in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
):
LOG.w(
f"put email from {contact} to {alias} to quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
email_log = quarantine_dmarc_failed_email(alias, contact, envelope, msg)
Notification.create(
user_id=alias.user_id,
title=f"{alias.email} has a new mail in quarantine",
message=Notification.render(
"notification/message-quarantine.html", alias=alias
),
commit=True,
)
user = alias.user
send_email_with_rate_control(
user,
ALERT_QUARANTINE_DMARC,
user.email,
f"An email sent to {alias.email} has been quarantined",
render(
"transactional/message-quarantine-dmarc.txt.jinja2",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
render(
"transactional/message-quarantine-dmarc.html",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
max_nb_alert=10,
ignore_smtp_error=True,
)
return status.E215
return None
def quarantine_dmarc_failed_email(alias, contact, envelope, msg) -> EmailLog:
add_or_replace_header(msg, headers.SL_DIRECTION, "Forward")
msg[headers.SL_ENVELOPE_TO] = alias.email
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
add_or_replace_header(msg, "From", contact.new_addr())
# replace CC & To emails by reverse-alias for all emails that are not alias
try:
replace_header_when_forward(msg, alias, "Cc")
replace_header_when_forward(msg, alias, "To")
except CannotCreateContactForReverseAlias:
Session.commit()
raise
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
return EmailLog.create(
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
alias_id=alias.id,
message_id=str(msg[headers.MESSAGE_ID]),
refused_email_id=refused_email.id,
is_spam=True,
blocked=True,
commit=True,
)
def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str]]: def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str]]:
"""return an array of SMTP status (is_success, smtp_status) """return an array of SMTP status (is_success, smtp_status)
is_success indicates whether an email has been delivered and is_success indicates whether an email has been delivered and
@ -717,20 +626,11 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
return [(True, res_status)] return [(True, res_status)]
# Check if we need to reject or quarantine based on dmarc # Check if we need to reject or quarantine based on dmarc
try: msg, dmarc_delivery_status = apply_dmarc_policy_for_forward_phase(
dmarc_delivery_status = apply_dmarc_policy(alias, contact, envelope, msg) alias, contact, envelope, msg
)
if dmarc_delivery_status is not None: if dmarc_delivery_status is not None:
return [(False, dmarc_delivery_status)] return [(False, dmarc_delivery_status)]
except DmarcSoftFail:
msg = add_header(
msg,
f"""This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.""",
f"""
<p style="color:red">
This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.
</p>
""",
)
ret = [] ret = []
mailboxes = alias.mailboxes mailboxes = alias.mailboxes
@ -1042,6 +942,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
Return whether an email has been delivered and Return whether an email has been delivered and
the smtp status ("250 Message accepted", "550 Non-existent email address", etc) the smtp status ("250 Message accepted", "550 Non-existent email address", etc)
""" """
reply_email = rcpt_to reply_email = rcpt_to
# reply_email must end with EMAIL_DOMAIN # reply_email must end with EMAIL_DOMAIN
@ -1077,7 +978,14 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
alias, alias,
contact, contact,
) )
return [(False, status.E504)] return False, status.E504
# Check if we need to reject or quarantine based on dmarc
dmarc_delivery_status = apply_dmarc_policy_for_reply_phase(
alias, contact, envelope, msg
)
if dmarc_delivery_status is not None:
return False, dmarc_delivery_status
# Anti-spoofing # Anti-spoofing
mailbox = get_mailbox_from_mail_from(mail_from, alias) mailbox = get_mailbox_from_mail_from(mail_from, alias)
@ -2613,9 +2521,9 @@ class MailHandler:
return_status = handle(envelope, msg) return_status = handle(envelope, msg)
elapsed = time.time() - start elapsed = time.time() - start
# Only bounce messages if the return-path passes the spf check. Otherwise black-hole it. # Only bounce messages if the return-path passes the spf check. Otherwise black-hole it.
spamd_result = SpamdResult.extract_from_headers(msg)
if return_status[0] == "5": if return_status[0] == "5":
spamd_result = get_spamd_result(msg) if spamd_result and spamd_result.spf in (
if spamd_result and get_spamd_result(msg).spf in (
SPFCheckResult.fail, SPFCheckResult.fail,
SPFCheckResult.soft_fail, SPFCheckResult.soft_fail,
): ):
@ -2631,6 +2539,8 @@ class MailHandler:
elapsed, elapsed,
return_status, return_status,
) )
SpamdResult.send_to_new_relic(msg)
newrelic.agent.record_custom_metric("Custom/email_handler_time", elapsed) newrelic.agent.record_custom_metric("Custom/email_handler_time", elapsed)
newrelic.agent.record_custom_metric("Custom/number_incoming_email", 1) newrelic.agent.record_custom_metric("Custom/number_incoming_email", 1)
return return_status return return_status

View file

@ -0,0 +1,20 @@
{% extends "base.html" %}
{% block content %}
{% call text() %}
<h1>
An attempt to send a fake email to {{ contact.email }} from your alias <b>{{ alias.email }}</b> using <b>{{ sender }}</b> has been blocked.
</h1>
{% endcall %}
{% call text() %}
As a measure to protect against <b>email spoofing</b>, we have blocked an attempt to send an email from your alias <b>{{ alias.email }}</b> using <b>{{ sender }}</b>.
{% endcall %}
{% call text() %}
Best, <br/>
SimpleLogin Team.
{% endcall %}
{% endblock %}

View file

@ -0,0 +1,8 @@
{% extends "base.txt.jinja2" %}
{% block content %}
An attempt to send a fake email to {{ contact.email }} from your alias {{ alias.email }} using {{ sender }} has been blocked.
As a measure to protect against email spoofing, we have blocked an attempt to send an email from your alias {{ alias.email }} using {{ sender }}.
{% endblock %}

View file

View file

@ -0,0 +1,25 @@
X-SimpleLogin-Client-IP: 54.39.200.130
Received-SPF: Softfail (mailfrom) identity=mailfrom; client-ip=34.59.200.130;
helo=relay.somewhere.net; envelope-from=everwaste@gmail.com;
receiver=<UNKNOWN>
Received: from relay.somewhere.net (relay.somewhere.net [34.59.200.130])
(using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits))
(No client certificate requested)
by mx1.sldev.ovh (Postfix) with ESMTPS id 6D8C13F069
for <wehrman_mannequin@sldev.ovh>; Thu, 17 Mar 2022 16:50:20 +0000 (UTC)
Date: Thu, 17 Mar 2022 16:50:18 +0000
To: {{ contact_email }}
From: {{ alias_email }}
Subject: test Thu, 17 Mar 2022 16:50:18 +0000
Message-Id: <20220317165018.000191@somewhere-5488dd4b6b-7crp6>
X-Mailer: swaks v20201014.0 jetmore.org/john/code/swaks/
X-Rspamd-Queue-Id: 6D8C13F069
X-Rspamd-Server: staging1
X-Spamd-Result: default: False [0.50 / 13.00];
{{ dmarc_result }}(0.00)[];
X-Rspamd-Pre-Result: action=add header;
module=force_actions;
unknown reason
X-Spam: Yes
This is a test mailing

View file

View file

@ -0,0 +1,34 @@
from app.handler.spamd_result import DmarcCheckResult, SpamdResult
from tests.utils import load_eml_file
def test_dmarc_result_softfail():
msg = load_eml_file("dmarc_gmail_softfail.eml")
assert DmarcCheckResult.soft_fail == SpamdResult.extract_from_headers(msg).dmarc
def test_dmarc_result_quarantine():
msg = load_eml_file("dmarc_quarantine.eml")
assert DmarcCheckResult.quarantine == SpamdResult.extract_from_headers(msg).dmarc
def test_dmarc_result_reject():
msg = load_eml_file("dmarc_reject.eml")
assert DmarcCheckResult.reject == SpamdResult.extract_from_headers(msg).dmarc
def test_dmarc_result_allow():
msg = load_eml_file("dmarc_allow.eml")
assert DmarcCheckResult.allow == SpamdResult.extract_from_headers(msg).dmarc
def test_dmarc_result_na():
msg = load_eml_file("dmarc_na.eml")
assert DmarcCheckResult.not_available == SpamdResult.extract_from_headers(msg).dmarc
def test_dmarc_result_bad_policy():
msg = load_eml_file("dmarc_bad_policy.eml")
assert SpamdResult._get_from_message(msg) is None
assert DmarcCheckResult.bad_policy == SpamdResult.extract_from_headers(msg).dmarc
assert SpamdResult._get_from_message(msg) is not None

View file

@ -1,9 +1,13 @@
import random
from email.message import EmailMessage from email.message import EmailMessage
from typing import List
import pytest
from aiosmtpd.smtp import Envelope from aiosmtpd.smtp import Envelope
import email_handler import email_handler
from app.config import BOUNCE_EMAIL from app.config import BOUNCE_EMAIL, EMAIL_DOMAIN, ALERT_DMARC_FAILED_REPLY_PHASE
from app.db import Session
from app.email import headers, status from app.email import headers, status
from app.models import ( from app.models import (
User, User,
@ -12,6 +16,8 @@ from app.models import (
IgnoredEmail, IgnoredEmail,
EmailLog, EmailLog,
Notification, Notification,
Contact,
SentAlert,
) )
from email_handler import ( from email_handler import (
get_mailbox_from_mail_from, get_mailbox_from_mail_from,
@ -75,7 +81,7 @@ def test_is_automatic_out_of_office():
assert is_automatic_out_of_office(msg) assert is_automatic_out_of_office(msg)
def test_dmarc_quarantine(flask_client): def test_dmarc_forward_quarantine(flask_client):
user = create_random_user() user = create_random_user()
alias = Alias.create_new_random(user) alias = Alias.create_new_random(user)
msg = load_eml_file("dmarc_quarantine.eml", {"alias_email": alias.email}) msg = load_eml_file("dmarc_quarantine.eml", {"alias_email": alias.email})
@ -98,25 +104,18 @@ def test_dmarc_quarantine(flask_client):
assert f"{alias.email} has a new mail in quarantine" == notifications[0].title assert f"{alias.email} has a new mail in quarantine" == notifications[0].title
# todo: re-enable test when softfail is quarantined def test_gmail_dmarc_softfail(flask_client):
# def test_gmail_dmarc_softfail(flask_client): user = create_random_user()
# user = create_random_user() alias = Alias.create_new_random(user)
# alias = Alias.create_new_random(user) msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email})
# msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email}) envelope = Envelope()
# envelope = Envelope() envelope.mail_from = msg["from"]
# envelope.mail_from = msg["from"] envelope.rcpt_tos = [msg["to"]]
# envelope.rcpt_tos = [msg["to"]] result = email_handler.handle(envelope, msg)
# result = email_handler.handle(envelope, msg) assert result == status.E200
# assert result == status.E215 # Enable when we can verify that the actual message sent has this content
# email_logs = ( # payload = msg.get_payload()
# EmailLog.filter_by(user_id=user.id, alias_id=alias.id) # assert payload.find("failed anti-phishing checks") > -1
# .order_by(EmailLog.id.desc())
# .all()
# )
# assert len(email_logs) == 1
# email_log = email_logs[0]
# assert email_log.blocked
# assert email_log.refused_email_id
def test_prevent_5xx_from_spf(flask_client): def test_prevent_5xx_from_spf(flask_client):
@ -159,3 +158,39 @@ def test_preserve_5xx_with_no_header(flask_client):
envelope.rcpt_tos = [msg["to"]] envelope.rcpt_tos = [msg["to"]]
result = email_handler.MailHandler()._handle(envelope, msg) result = email_handler.MailHandler()._handle(envelope, msg)
assert result == status.E512 assert result == status.E512
def generate_dmarc_result() -> List:
return ["DMARC_POLICY_QUARANTINE", "DMARC_POLICY_REJECT", "DMARC_POLICY_SOFTFAIL"]
@pytest.mark.parametrize("dmarc_result", generate_dmarc_result())
def test_dmarc_reply_quarantine(flask_client, dmarc_result):
user = create_random_user()
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email="random-{}@nowhere.net".format(int(random.random())),
name="Name {}".format(int(random.random())),
reply_email="random-{}@{}".format(random.random(), EMAIL_DOMAIN),
)
Session.commit()
msg = load_eml_file(
"dmarc_reply_check.eml",
{
"alias_email": alias.email,
"contact_email": contact.reply_email,
"dmarc_result": dmarc_result,
},
)
envelope = Envelope()
envelope.mail_from = msg["from"]
envelope.rcpt_tos = [msg["to"]]
result = email_handler.handle(envelope, msg)
assert result == status.E215
alerts = SentAlert.filter_by(
user_id=user.id, alert_type=ALERT_DMARC_FAILED_REPLY_PHASE
).all()
assert len(alerts) == 1

View file

@ -36,7 +36,6 @@ from app.email_utils import (
get_orig_message_from_bounce, get_orig_message_from_bounce,
get_mailbox_bounce_info, get_mailbox_bounce_info,
is_invalid_mailbox_domain, is_invalid_mailbox_domain,
get_spamd_result,
) )
from app.models import ( from app.models import (
User, User,
@ -46,7 +45,6 @@ from app.models import (
EmailLog, EmailLog,
IgnoreBounceSender, IgnoreBounceSender,
InvalidMailboxDomain, InvalidMailboxDomain,
DmarcCheckResult,
) )
# flake8: noqa: E101, W191 # flake8: noqa: E101, W191
@ -795,36 +793,6 @@ def test_is_invalid_mailbox_domain(flask_client):
assert not is_invalid_mailbox_domain("xy.zt") assert not is_invalid_mailbox_domain("xy.zt")
def test_dmarc_result_softfail():
msg = load_eml_file("dmarc_gmail_softfail.eml")
assert DmarcCheckResult.soft_fail == get_spamd_result(msg).dmarc
def test_dmarc_result_quarantine():
msg = load_eml_file("dmarc_quarantine.eml")
assert DmarcCheckResult.quarantine == get_spamd_result(msg).dmarc
def test_dmarc_result_reject():
msg = load_eml_file("dmarc_reject.eml")
assert DmarcCheckResult.reject == get_spamd_result(msg).dmarc
def test_dmarc_result_allow():
msg = load_eml_file("dmarc_allow.eml")
assert DmarcCheckResult.allow == get_spamd_result(msg).dmarc
def test_dmarc_result_na():
msg = load_eml_file("dmarc_na.eml")
assert DmarcCheckResult.not_available == get_spamd_result(msg).dmarc
def test_dmarc_result_bad_policy():
msg = load_eml_file("dmarc_bad_policy.eml")
assert DmarcCheckResult.bad_policy == get_spamd_result(msg).dmarc
def test_add_header_multipart_with_invalid_part(): def test_add_header_multipart_with_invalid_part():
msg = load_eml_file("multipart_alternative.eml") msg = load_eml_file("multipart_alternative.eml")
parts = msg.get_payload() + ["invalid"] parts = msg.get_payload() + ["invalid"]

View file

@ -39,11 +39,11 @@ def random_token(length: int = 10) -> str:
def create_random_user() -> User: def create_random_user() -> User:
email = "{}@{}.com".format(random_token(), random_token()) random_email = "{}@{}.com".format(random_token(), random_token())
return User.create( return User.create(
email=email, email=random_email,
password="password", password="password",
name="Test User", name="Test {}".format(random_token()),
activated=True, activated=True,
commit=True, commit=True,
) )