From 112b2c77c317db392fc37c21f1916f64ae0fd7fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Thu, 21 Apr 2022 11:30:39 +0200 Subject: [PATCH] Add backwards compat with shake128 signed verp emails --- app/email_utils.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/app/email_utils.py b/app/email_utils.py index 5242280f..cba5dc7a 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -1496,9 +1496,36 @@ def generate_verp_email( ).lower() +# Remove this method after 2022-05-20. Just for backwards compat. +def deprecated_get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: + idx = email.find("@") + if idx == -1: + return None + username = email[:idx] + fields = username.split(".") + if len(fields) != 3 or fields[0] != VERP_PREFIX: + return None + padding = (8 - (len(fields[1]) % 8)) % 8 + payload = base64.b32decode(fields[1].encode("utf-8").upper() + (b"=" * padding)) + padding = (8 - (len(fields[2]) % 8)) % 8 + signature = base64.b32decode(fields[2].encode("utf-8").upper() + (b"=" * padding)) + expected_signature = hmac.new( + VERP_EMAIL_SECRET.encode("utf-8"), payload, "shake128" + ).digest() + if expected_signature != signature: + return None + data = json.loads(payload) + # verp type, object_id, time + if len(data) != 3: + return None + if data[2] > time.time() + VERP_MESSAGE_LIFETIME: + return None + return VerpType(data[0]), data[1] + + # This method processes the email address, checks if it's a signed verp email generated by us to receive bounces # and extracts the type of verp email and associated email log id/transactional email id stored as object_id -def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: +def new_get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: idx = email.find("@") if idx == -1: return None @@ -1522,3 +1549,10 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: if data[2] > (time.time() + VERP_MESSAGE_LIFETIME - VERP_TIME_START) / 60: return None return VerpType(data[0]), data[1] + + +# Replace with new_get_verp_info_from_email when deprecated_get_verp_info_from_email is removed +def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: + return new_get_verp_info_from_email(email) or deprecated_get_verp_info_from_email( + email + )