diff --git a/app/auth/views/fido.py b/app/auth/views/fido.py index 4258870f..eba5ea5c 100644 --- a/app/auth/views/fido.py +++ b/app/auth/views/fido.py @@ -23,6 +23,7 @@ from app.db import Session from app.extensions import limiter from app.log import LOG from app.models import User, Fido, MfaBrowser +from app.utils import sanitize_next_url class FidoTokenForm(FlaskForm): @@ -54,7 +55,7 @@ def fido(): auto_activate = True fido_token_form = FidoTokenForm() - next_url = request.args.get("next") + next_url = sanitize_next_url(request.args.get("next")) if request.cookies.get("mfa"): browser = MfaBrowser.get_by(token=request.cookies.get("mfa")) diff --git a/app/auth/views/github.py b/app/auth/views/github.py index abc2df2b..3f272a32 100644 --- a/app/auth/views/github.py +++ b/app/auth/views/github.py @@ -7,7 +7,7 @@ from app.config import GITHUB_CLIENT_ID, GITHUB_CLIENT_SECRET, URL from app.db import Session from app.log import LOG from app.models import User, SocialAuth -from app.utils import encode_url, sanitize_email +from app.utils import encode_url, sanitize_email, sanitize_next_url _authorization_base_url = "https://github.com/login/oauth/authorize" _token_url = "https://github.com/login/oauth/access_token" @@ -19,7 +19,7 @@ _redirect_uri = URL + "/auth/github/callback" @auth_bp.route("/github/login") def github_login(): - next_url = request.args.get("next") + next_url = sanitize_next_url(request.args.get("next")) if next_url: redirect_uri = _redirect_uri + "?next=" + encode_url(next_url) else: @@ -97,6 +97,6 @@ def github_callback(): Session.commit() # The activation link contains the original page, for ex authorize page - next_url = request.args.get("next") if request.args else None + next_url = sanitize_next_url(request.args.get("next")) if request.args else None return after_login(user, next_url) diff --git a/app/auth/views/mfa.py b/app/auth/views/mfa.py index b9f0214a..af915134 100644 --- a/app/auth/views/mfa.py +++ b/app/auth/views/mfa.py @@ -19,6 +19,7 @@ from app.db import Session from app.email_utils import send_invalid_totp_login_email from app.extensions import limiter from app.models import User, MfaBrowser +from app.utils import sanitize_next_url class OtpTokenForm(FlaskForm): @@ -48,7 +49,7 @@ def mfa(): return redirect(url_for("auth.login")) otp_token_form = OtpTokenForm() - next_url = request.args.get("next") + next_url = sanitize_next_url(request.args.get("next")) if request.cookies.get("mfa"): browser = MfaBrowser.get_by(token=request.cookies.get("mfa")) diff --git a/app/auth/views/recovery.py b/app/auth/views/recovery.py index 31a9ebcd..9e620e9f 100644 --- a/app/auth/views/recovery.py +++ b/app/auth/views/recovery.py @@ -11,6 +11,7 @@ from app.email_utils import send_invalid_totp_login_email from app.extensions import limiter from app.log import LOG from app.models import User, RecoveryCode +from app.utils import sanitize_next_url class RecoveryForm(FlaskForm): @@ -37,7 +38,7 @@ def recovery_route(): return redirect(url_for("auth.login")) recovery_form = RecoveryForm() - next_url = request.args.get("next") + next_url = sanitize_next_url(request.args.get("next")) if recovery_form.validate_on_submit(): code = recovery_form.code.data diff --git a/app/dashboard/views/enter_sudo.py b/app/dashboard/views/enter_sudo.py index 6d388bdf..d45f5c00 100644 --- a/app/dashboard/views/enter_sudo.py +++ b/app/dashboard/views/enter_sudo.py @@ -8,6 +8,7 @@ from wtforms import PasswordField, validators from app.dashboard.base import dashboard_bp from app.log import LOG +from app.utils import sanitize_next_url _SUDO_GAP = 900 @@ -28,7 +29,7 @@ def enter_sudo(): session["sudo_time"] = int(time()) # User comes to sudo page from another page - next_url = request.args.get("next") + next_url = sanitize_next_url(request.args.get("next")) if next_url: LOG.d("redirect user to %s", next_url) return redirect(next_url) diff --git a/app/oauth/views/authorize.py b/app/oauth/views/authorize.py index 49941836..e4e9f60b 100644 --- a/app/oauth/views/authorize.py +++ b/app/oauth/views/authorize.py @@ -70,10 +70,7 @@ def authorize(): client = Client.get_by(oauth_client_id=oauth_client_id) if not client: - final_redirect_uri = ( - f"{redirect_uri}?error=invalid_client_id&client_id={oauth_client_id}" - ) - return redirect(final_redirect_uri) + return redirect(url_for("auth.login")) # check if redirect_uri is valid # allow localhost by default diff --git a/app/utils.py b/app/utils.py index 38fc96d6..b98d6a87 100644 --- a/app/utils.py +++ b/app/utils.py @@ -80,28 +80,17 @@ class NextUrlSanitizer: def sanitize(url: Optional[str], allowed_domains: List[str]) -> Optional[str]: if not url: return None - # Relative redirect - if url[0] == "/": - return url - return NextUrlSanitizer.__handle_absolute_redirect(url, allowed_domains) + result = urllib.parse.urlparse(url) + if result.hostname: + if result.hostname in allowed_domains: + return url + else: + return None + if result.path and result.path[0] == "/": + return result.path - @staticmethod - def __handle_absolute_redirect( - url: str, allowed_domains: List[str] - ) -> Optional[str]: - if not NextUrlSanitizer.__is_absolute_url(url): - # Unknown url, something like &next=something.example.com - return None - parsed = urllib.parse.urlparse(url) - if parsed.hostname in allowed_domains: - return url - # Not allowed domain return None - @staticmethod - def __is_absolute_url(url: str) -> bool: - return url.startswith(("http://", "https://")) - def sanitize_next_url(url: Optional[str]) -> Optional[str]: return NextUrlSanitizer.sanitize(url, ALLOWED_REDIRECT_DOMAINS) diff --git a/tests/oauth/test_authorize.py b/tests/oauth/test_authorize.py index 82f1e7e0..fed7a708 100644 --- a/tests/oauth/test_authorize.py +++ b/tests/oauth/test_authorize.py @@ -4,6 +4,7 @@ from urllib.parse import urlparse, parse_qs from flask import url_for +from app.config import ALLOWED_REDIRECT_DOMAINS from app.db import Session from app.jose_utils import verify_id_token, decode_id_token from app.models import Client, User, ClientUser @@ -49,7 +50,7 @@ def test_authorize_page_non_login_user(flask_client): "oauth.authorize", client_id=client.oauth_client_id, state="teststate", - redirect_uri="http://localhost", + redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}", response_type="code", ) ) @@ -109,7 +110,7 @@ def test_authorize_page_login_user(flask_client): "oauth.authorize", client_id=client.oauth_client_id, state="teststate", - redirect_uri="http://localhost", + redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}", response_type="code", ) ) @@ -136,7 +137,7 @@ def test_authorize_code_flow_no_openid_scope(flask_client): "oauth.authorize", client_id=client.oauth_client_id, state="teststate", - redirect_uri="http://localhost", + redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}", response_type="code", ), data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, @@ -149,7 +150,7 @@ def test_authorize_code_flow_no_openid_scope(flask_client): # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg o = urlparse(r.location) - assert o.netloc == "localhost" + assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0] assert not o.fragment # parse the query, should return something like @@ -225,7 +226,7 @@ def test_authorize_code_flow_with_openid_scope(flask_client): "oauth.authorize", client_id=client.oauth_client_id, state="teststate", - redirect_uri="http://localhost", + redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}", response_type="code", scope="openid", # openid is in scope ), @@ -239,7 +240,7 @@ def test_authorize_code_flow_with_openid_scope(flask_client): # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg o = urlparse(r.location) - assert o.netloc == "localhost" + assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0] assert not o.fragment # parse the query, should return something like @@ -318,7 +319,7 @@ def test_authorize_token_flow(flask_client): "oauth.authorize", client_id=client.oauth_client_id, state="teststate", - redirect_uri="http://localhost", + redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}", response_type="token", # token flow ), data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, @@ -331,7 +332,7 @@ def test_authorize_token_flow(flask_client): # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg o = urlparse(r.location) - assert o.netloc == "localhost" + assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0] # in token flow, access_token is in fragment and not query assert o.fragment @@ -365,7 +366,7 @@ def test_authorize_id_token_flow(flask_client): "oauth.authorize", client_id=client.oauth_client_id, state="teststate", - redirect_uri="http://localhost", + redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}", response_type="id_token", # id_token flow ), data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, @@ -378,7 +379,7 @@ def test_authorize_id_token_flow(flask_client): # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg o = urlparse(r.location) - assert o.netloc == "localhost" + assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0] assert not o.fragment assert o.query @@ -414,7 +415,7 @@ def test_authorize_token_id_token_flow(flask_client): "oauth.authorize", client_id=client.oauth_client_id, state="teststate", - redirect_uri="http://localhost", + redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}", response_type="id_token token", # id_token,token flow ), data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, @@ -427,7 +428,7 @@ def test_authorize_token_id_token_flow(flask_client): # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg o = urlparse(r.location) - assert o.netloc == "localhost" + assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0] assert o.fragment assert not o.query @@ -504,7 +505,7 @@ def test_authorize_code_id_token_flow(flask_client): "oauth.authorize", client_id=client.oauth_client_id, state="teststate", - redirect_uri="http://localhost", + redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}", response_type="id_token code", # id_token,code flow ), data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, @@ -517,7 +518,7 @@ def test_authorize_code_id_token_flow(flask_client): # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg o = urlparse(r.location) - assert o.netloc == "localhost" + assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0] assert not o.fragment assert o.query @@ -642,10 +643,7 @@ def test_authorize_page_invalid_client_id(flask_client): ) assert r.status_code == 302 - assert ( - r.location - == "http://localhost?error=invalid_client_id&client_id=invalid_client_id" - ) + assert r.location == url_for("auth.login") def test_authorize_page_http_not_allowed(flask_client): diff --git a/tests/test_utils.py b/tests/test_utils.py index a02137ad..ef7605d4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,3 +1,7 @@ +from typing import List + +import pytest + from app.config import ALLOWED_REDIRECT_DOMAINS from app.utils import random_string, random_words, sanitize_next_url @@ -12,22 +16,27 @@ def test_random_string(): assert len(s) > 0 -def test_sanitize_url(): +def generate_sanitize_url_cases() -> List: cases = [ - {"url": None, "expected": None}, - {"url": "", "expected": None}, - {"url": "http://unknown.domain", "expected": None}, - {"url": "https://badzone.org", "expected": None}, - {"url": "/", "expected": "/"}, - {"url": "/auth", "expected": "/auth"}, - {"url": "/some/path", "expected": "/some/path"}, + [None, None], + ["", None], + ["http://badhttp.com", None], + ["https://badhttps.com", None], + ["/", "/"], + ["/auth", "/auth"], + ["/some/path", "/some/path"], + ["//somewhere.net", None], ] - for domain in ALLOWED_REDIRECT_DOMAINS: - cases.append({"url": f"http://{domain}", "expected": f"http://{domain}"}) - cases.append({"url": f"https://{domain}", "expected": f"https://{domain}"}) - cases.append({"url": domain, "expected": None}) + cases.append([f"http://{domain}", f"http://{domain}"]) + cases.append([f"https://{domain}", f"https://{domain}"]) + cases.append([f"https://{domain}/sub", f"https://{domain}/sub"]) + cases.append([domain, None]) + cases.append([f"//{domain}", f"//{domain}"]) + return cases - for case in cases: - res = sanitize_next_url(case["url"]) - assert res == case["expected"] + +@pytest.mark.parametrize("url,expected", generate_sanitize_url_cases()) +def test_sanitize_url(url, expected): + sanitized = sanitize_next_url(url) + assert expected == sanitized