Compare mx domains with priority order and not priority value
This commit is contained in:
parent
eeb24f594a
commit
f049da8c9a
|
@ -13,6 +13,7 @@ from app.dns_utils import (
|
||||||
get_spf_domain,
|
get_spf_domain,
|
||||||
get_txt_record,
|
get_txt_record,
|
||||||
get_cname_record,
|
get_cname_record,
|
||||||
|
is_mx_equivalent,
|
||||||
)
|
)
|
||||||
from app.log import LOG
|
from app.log import LOG
|
||||||
from app.models import (
|
from app.models import (
|
||||||
|
@ -77,7 +78,7 @@ def domain_detail_dns(custom_domain_id):
|
||||||
elif request.form.get("form-name") == "check-mx":
|
elif request.form.get("form-name") == "check-mx":
|
||||||
mx_domains = get_mx_domains(custom_domain.domain)
|
mx_domains = get_mx_domains(custom_domain.domain)
|
||||||
|
|
||||||
if sorted(mx_domains) != sorted(EMAIL_SERVERS_WITH_PRIORITY):
|
if is_mx_equivalent(mx_domains, EMAIL_SERVERS_WITH_PRIORITY):
|
||||||
flash("The MX record is not correctly set", "warning")
|
flash("The MX record is not correctly set", "warning")
|
||||||
|
|
||||||
mx_ok = False
|
mx_ok = False
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from typing import Optional
|
from typing import Optional, List, Tuple
|
||||||
|
|
||||||
import dns.resolver
|
import dns.resolver
|
||||||
|
|
||||||
|
@ -95,3 +95,28 @@ def get_txt_record(hostname) -> [str]:
|
||||||
ret.append(record)
|
ret.append(record)
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
def is_mx_equivalent(
|
||||||
|
mx_domains: List[Tuple[int, str]], ref_mx_domains: List[Tuple[int, str]]
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Compare mx_domains with ref_mx_domains to see if they are equivalent.
|
||||||
|
mx_domains and ref_mx_domains are list of (priority, domain)
|
||||||
|
|
||||||
|
The priority order is taken into account but not the priority number.
|
||||||
|
For example, [(1, domain1), (2, domain2)] is equivalent to [(10, domain1), (20, domain2)]
|
||||||
|
"""
|
||||||
|
mx_domains = sorted(mx_domains, key=lambda priority_domain: priority_domain[0])
|
||||||
|
ref_mx_domains = sorted(
|
||||||
|
ref_mx_domains, key=lambda priority_domain: priority_domain[0]
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(mx_domains) != len(ref_mx_domains):
|
||||||
|
return False
|
||||||
|
|
||||||
|
for i in range(0, len(mx_domains)):
|
||||||
|
if mx_domains[i][1] != ref_mx_domains[i][1]:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
4
cron.py
4
cron.py
|
@ -23,7 +23,7 @@ from app.config import (
|
||||||
HIBP_SCAN_INTERVAL_DAYS,
|
HIBP_SCAN_INTERVAL_DAYS,
|
||||||
)
|
)
|
||||||
from app.db import Session
|
from app.db import Session
|
||||||
from app.dns_utils import get_mx_domains
|
from app.dns_utils import get_mx_domains, is_mx_equivalent
|
||||||
from app.email_utils import (
|
from app.email_utils import (
|
||||||
send_email,
|
send_email,
|
||||||
send_trial_end_soon_email,
|
send_trial_end_soon_email,
|
||||||
|
@ -695,7 +695,7 @@ def check_custom_domain():
|
||||||
for custom_domain in CustomDomain.filter_by(verified=True): # type: CustomDomain
|
for custom_domain in CustomDomain.filter_by(verified=True): # type: CustomDomain
|
||||||
mx_domains = get_mx_domains(custom_domain.domain)
|
mx_domains = get_mx_domains(custom_domain.domain)
|
||||||
|
|
||||||
if sorted(mx_domains) != sorted(EMAIL_SERVERS_WITH_PRIORITY):
|
if is_mx_equivalent(mx_domains, EMAIL_SERVERS_WITH_PRIORITY):
|
||||||
user = custom_domain.user
|
user = custom_domain.user
|
||||||
LOG.w(
|
LOG.w(
|
||||||
"The MX record is not correctly set for %s %s %s",
|
"The MX record is not correctly set for %s %s %s",
|
||||||
|
|
|
@ -1,4 +1,9 @@
|
||||||
from app.dns_utils import get_mx_domains, get_spf_domain, get_txt_record
|
from app.dns_utils import (
|
||||||
|
get_mx_domains,
|
||||||
|
get_spf_domain,
|
||||||
|
get_txt_record,
|
||||||
|
is_mx_equivalent,
|
||||||
|
)
|
||||||
|
|
||||||
# use our own domain for test
|
# use our own domain for test
|
||||||
_DOMAIN = "simplelogin.io"
|
_DOMAIN = "simplelogin.io"
|
||||||
|
@ -22,3 +27,14 @@ def test_get_spf_domain():
|
||||||
def test_get_txt_record():
|
def test_get_txt_record():
|
||||||
r = get_txt_record(_DOMAIN)
|
r = get_txt_record(_DOMAIN)
|
||||||
assert len(r) > 0
|
assert len(r) > 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_mx_equivalent():
|
||||||
|
assert is_mx_equivalent([], [])
|
||||||
|
assert is_mx_equivalent([(1, "domain")], [(1, "domain")])
|
||||||
|
assert is_mx_equivalent(
|
||||||
|
[(10, "domain1"), (20, "domain2")], [(10, "domain1"), (20, "domain2")]
|
||||||
|
)
|
||||||
|
assert is_mx_equivalent(
|
||||||
|
[(5, "domain1"), (10, "domain2")], [(10, "domain1"), (20, "domain2")]
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue