simple-login/app/dns_utils.py

121 lines
3.1 KiB
Python
Raw Normal View History

2022-02-24 14:05:05 +00:00
from app import config
from typing import Optional, List, Tuple
2019-12-02 00:13:39 +00:00
import dns.resolver
def _get_dns_resolver():
my_resolver = dns.resolver.Resolver()
2022-02-24 14:05:05 +00:00
my_resolver.nameservers = config.NAMESERVERS
return my_resolver
2020-11-16 18:16:06 +00:00
def get_ns(hostname) -> [str]:
try:
2022-02-24 14:05:05 +00:00
answers = _get_dns_resolver().resolve(hostname, "NS", search=True)
2020-12-06 16:59:07 +00:00
except Exception:
2020-11-16 18:16:06 +00:00
return []
return [a.to_text() for a in answers]
def get_cname_record(hostname) -> Optional[str]:
2020-05-03 10:48:42 +00:00
"""Return the CNAME record if exists for a domain, WITHOUT the trailing period at the end"""
try:
2022-02-24 14:05:05 +00:00
answers = _get_dns_resolver().resolve(hostname, "CNAME", search=True)
except Exception:
return None
for a in answers:
2020-05-03 10:48:42 +00:00
ret = a.to_text()
return ret[:-1]
return None
def get_mx_domains(hostname) -> [(int, str)]:
"""return list of (priority, domain name).
domain name ends with a "." at the end.
2019-12-27 22:36:13 +00:00
"""
2019-12-07 22:42:57 +00:00
try:
2022-02-24 14:05:05 +00:00
answers = _get_dns_resolver().resolve(hostname, "MX", search=True)
except Exception:
2019-12-07 22:42:57 +00:00
return []
2019-12-02 00:13:39 +00:00
ret = []
for a in answers:
record = a.to_text() # for ex '20 alt2.aspmx.l.google.com.'
parts = record.split(" ")
2019-12-27 22:36:13 +00:00
ret.append((int(parts[0]), parts[1]))
2019-12-02 00:13:39 +00:00
return ret
2019-12-06 10:54:01 +00:00
_include_spf = "include:"
def get_spf_domain(hostname) -> [str]:
"""return all domains listed in *include:*"""
try:
2022-02-24 14:05:05 +00:00
answers = _get_dns_resolver().resolve(hostname, "TXT", search=True)
2020-01-02 21:15:08 +00:00
except Exception:
2019-12-06 10:54:01 +00:00
return []
ret = []
for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
for record in a.strings:
record = record.decode() # record is bytes
if record.startswith("v=spf1"):
parts = record.split(" ")
for part in parts:
if part.startswith(_include_spf):
2020-12-06 21:11:58 +00:00
ret.append(part[part.find(_include_spf) + len(_include_spf) :])
2019-12-06 10:54:01 +00:00
return ret
2019-12-27 22:36:13 +00:00
def get_txt_record(hostname) -> [str]:
try:
2022-02-24 14:05:05 +00:00
answers = _get_dns_resolver().resolve(hostname, "TXT", search=True)
2020-01-02 21:15:08 +00:00
except Exception:
2019-12-27 22:36:13 +00:00
return []
ret = []
for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
for record in a.strings:
record = record.decode() # record is bytes
ret.append(record)
2020-05-03 10:01:31 +00:00
return ret
def is_mx_equivalent(
mx_domains: List[Tuple[int, str]], ref_mx_domains: List[Tuple[int, str]]
) -> bool:
"""
Compare mx_domains with ref_mx_domains to see if they are equivalent.
mx_domains and ref_mx_domains are list of (priority, domain)
The priority order is taken into account but not the priority number.
For example, [(1, domain1), (2, domain2)] is equivalent to [(10, domain1), (20, domain2)]
"""
mx_domains = sorted(mx_domains, key=lambda priority_domain: priority_domain[0])
ref_mx_domains = sorted(
ref_mx_domains, key=lambda priority_domain: priority_domain[0]
)
if len(mx_domains) < len(ref_mx_domains):
return False
for i in range(0, len(ref_mx_domains)):
if mx_domains[i][1] != ref_mx_domains[i][1]:
return False
return True