Merge pull request #862 from simple-login/ac/sanitize-next

Properly validate //host.com urls
This commit is contained in:
Son Nguyen Kim 2022-03-30 19:40:36 +07:00 committed by GitHub
commit f7a98bc7d2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 60 additions and 63 deletions

View file

@ -23,6 +23,7 @@ from app.db import Session
from app.extensions import limiter from app.extensions import limiter
from app.log import LOG from app.log import LOG
from app.models import User, Fido, MfaBrowser from app.models import User, Fido, MfaBrowser
from app.utils import sanitize_next_url
class FidoTokenForm(FlaskForm): class FidoTokenForm(FlaskForm):
@ -54,7 +55,7 @@ def fido():
auto_activate = True auto_activate = True
fido_token_form = FidoTokenForm() fido_token_form = FidoTokenForm()
next_url = request.args.get("next") next_url = sanitize_next_url(request.args.get("next"))
if request.cookies.get("mfa"): if request.cookies.get("mfa"):
browser = MfaBrowser.get_by(token=request.cookies.get("mfa")) browser = MfaBrowser.get_by(token=request.cookies.get("mfa"))

View file

@ -7,7 +7,7 @@ from app.config import GITHUB_CLIENT_ID, GITHUB_CLIENT_SECRET, URL
from app.db import Session from app.db import Session
from app.log import LOG from app.log import LOG
from app.models import User, SocialAuth from app.models import User, SocialAuth
from app.utils import encode_url, sanitize_email from app.utils import encode_url, sanitize_email, sanitize_next_url
_authorization_base_url = "https://github.com/login/oauth/authorize" _authorization_base_url = "https://github.com/login/oauth/authorize"
_token_url = "https://github.com/login/oauth/access_token" _token_url = "https://github.com/login/oauth/access_token"
@ -19,7 +19,7 @@ _redirect_uri = URL + "/auth/github/callback"
@auth_bp.route("/github/login") @auth_bp.route("/github/login")
def github_login(): def github_login():
next_url = request.args.get("next") next_url = sanitize_next_url(request.args.get("next"))
if next_url: if next_url:
redirect_uri = _redirect_uri + "?next=" + encode_url(next_url) redirect_uri = _redirect_uri + "?next=" + encode_url(next_url)
else: else:
@ -97,6 +97,6 @@ def github_callback():
Session.commit() Session.commit()
# The activation link contains the original page, for ex authorize page # The activation link contains the original page, for ex authorize page
next_url = request.args.get("next") if request.args else None next_url = sanitize_next_url(request.args.get("next")) if request.args else None
return after_login(user, next_url) return after_login(user, next_url)

View file

@ -19,6 +19,7 @@ from app.db import Session
from app.email_utils import send_invalid_totp_login_email from app.email_utils import send_invalid_totp_login_email
from app.extensions import limiter from app.extensions import limiter
from app.models import User, MfaBrowser from app.models import User, MfaBrowser
from app.utils import sanitize_next_url
class OtpTokenForm(FlaskForm): class OtpTokenForm(FlaskForm):
@ -48,7 +49,7 @@ def mfa():
return redirect(url_for("auth.login")) return redirect(url_for("auth.login"))
otp_token_form = OtpTokenForm() otp_token_form = OtpTokenForm()
next_url = request.args.get("next") next_url = sanitize_next_url(request.args.get("next"))
if request.cookies.get("mfa"): if request.cookies.get("mfa"):
browser = MfaBrowser.get_by(token=request.cookies.get("mfa")) browser = MfaBrowser.get_by(token=request.cookies.get("mfa"))

View file

@ -11,6 +11,7 @@ from app.email_utils import send_invalid_totp_login_email
from app.extensions import limiter from app.extensions import limiter
from app.log import LOG from app.log import LOG
from app.models import User, RecoveryCode from app.models import User, RecoveryCode
from app.utils import sanitize_next_url
class RecoveryForm(FlaskForm): class RecoveryForm(FlaskForm):
@ -37,7 +38,7 @@ def recovery_route():
return redirect(url_for("auth.login")) return redirect(url_for("auth.login"))
recovery_form = RecoveryForm() recovery_form = RecoveryForm()
next_url = request.args.get("next") next_url = sanitize_next_url(request.args.get("next"))
if recovery_form.validate_on_submit(): if recovery_form.validate_on_submit():
code = recovery_form.code.data code = recovery_form.code.data

View file

@ -8,6 +8,7 @@ from wtforms import PasswordField, validators
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.log import LOG from app.log import LOG
from app.utils import sanitize_next_url
_SUDO_GAP = 900 _SUDO_GAP = 900
@ -28,7 +29,7 @@ def enter_sudo():
session["sudo_time"] = int(time()) session["sudo_time"] = int(time())
# User comes to sudo page from another page # User comes to sudo page from another page
next_url = request.args.get("next") next_url = sanitize_next_url(request.args.get("next"))
if next_url: if next_url:
LOG.d("redirect user to %s", next_url) LOG.d("redirect user to %s", next_url)
return redirect(next_url) return redirect(next_url)

View file

@ -70,10 +70,7 @@ def authorize():
client = Client.get_by(oauth_client_id=oauth_client_id) client = Client.get_by(oauth_client_id=oauth_client_id)
if not client: if not client:
final_redirect_uri = ( return redirect(url_for("auth.login"))
f"{redirect_uri}?error=invalid_client_id&client_id={oauth_client_id}"
)
return redirect(final_redirect_uri)
# check if redirect_uri is valid # check if redirect_uri is valid
# allow localhost by default # allow localhost by default

View file

@ -80,27 +80,16 @@ class NextUrlSanitizer:
def sanitize(url: Optional[str], allowed_domains: List[str]) -> Optional[str]: def sanitize(url: Optional[str], allowed_domains: List[str]) -> Optional[str]:
if not url: if not url:
return None return None
# Relative redirect result = urllib.parse.urlparse(url)
if url[0] == "/": if result.hostname:
if result.hostname in allowed_domains:
return url return url
return NextUrlSanitizer.__handle_absolute_redirect(url, allowed_domains) else:
@staticmethod
def __handle_absolute_redirect(
url: str, allowed_domains: List[str]
) -> Optional[str]:
if not NextUrlSanitizer.__is_absolute_url(url):
# Unknown url, something like &next=something.example.com
return None
parsed = urllib.parse.urlparse(url)
if parsed.hostname in allowed_domains:
return url
# Not allowed domain
return None return None
if result.path and result.path[0] == "/":
return result.path
@staticmethod return None
def __is_absolute_url(url: str) -> bool:
return url.startswith(("http://", "https://"))
def sanitize_next_url(url: Optional[str]) -> Optional[str]: def sanitize_next_url(url: Optional[str]) -> Optional[str]:

View file

@ -4,6 +4,7 @@ from urllib.parse import urlparse, parse_qs
from flask import url_for from flask import url_for
from app.config import ALLOWED_REDIRECT_DOMAINS
from app.db import Session from app.db import Session
from app.jose_utils import verify_id_token, decode_id_token from app.jose_utils import verify_id_token, decode_id_token
from app.models import Client, User, ClientUser from app.models import Client, User, ClientUser
@ -49,7 +50,7 @@ def test_authorize_page_non_login_user(flask_client):
"oauth.authorize", "oauth.authorize",
client_id=client.oauth_client_id, client_id=client.oauth_client_id,
state="teststate", state="teststate",
redirect_uri="http://localhost", redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}",
response_type="code", response_type="code",
) )
) )
@ -109,7 +110,7 @@ def test_authorize_page_login_user(flask_client):
"oauth.authorize", "oauth.authorize",
client_id=client.oauth_client_id, client_id=client.oauth_client_id,
state="teststate", state="teststate",
redirect_uri="http://localhost", redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}",
response_type="code", response_type="code",
) )
) )
@ -136,7 +137,7 @@ def test_authorize_code_flow_no_openid_scope(flask_client):
"oauth.authorize", "oauth.authorize",
client_id=client.oauth_client_id, client_id=client.oauth_client_id,
state="teststate", state="teststate",
redirect_uri="http://localhost", redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}",
response_type="code", response_type="code",
), ),
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
@ -149,7 +150,7 @@ def test_authorize_code_flow_no_openid_scope(flask_client):
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
o = urlparse(r.location) o = urlparse(r.location)
assert o.netloc == "localhost" assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0]
assert not o.fragment assert not o.fragment
# parse the query, should return something like # parse the query, should return something like
@ -225,7 +226,7 @@ def test_authorize_code_flow_with_openid_scope(flask_client):
"oauth.authorize", "oauth.authorize",
client_id=client.oauth_client_id, client_id=client.oauth_client_id,
state="teststate", state="teststate",
redirect_uri="http://localhost", redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}",
response_type="code", response_type="code",
scope="openid", # openid is in scope scope="openid", # openid is in scope
), ),
@ -239,7 +240,7 @@ def test_authorize_code_flow_with_openid_scope(flask_client):
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
o = urlparse(r.location) o = urlparse(r.location)
assert o.netloc == "localhost" assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0]
assert not o.fragment assert not o.fragment
# parse the query, should return something like # parse the query, should return something like
@ -318,7 +319,7 @@ def test_authorize_token_flow(flask_client):
"oauth.authorize", "oauth.authorize",
client_id=client.oauth_client_id, client_id=client.oauth_client_id,
state="teststate", state="teststate",
redirect_uri="http://localhost", redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}",
response_type="token", # token flow response_type="token", # token flow
), ),
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
@ -331,7 +332,7 @@ def test_authorize_token_flow(flask_client):
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
o = urlparse(r.location) o = urlparse(r.location)
assert o.netloc == "localhost" assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0]
# in token flow, access_token is in fragment and not query # in token flow, access_token is in fragment and not query
assert o.fragment assert o.fragment
@ -365,7 +366,7 @@ def test_authorize_id_token_flow(flask_client):
"oauth.authorize", "oauth.authorize",
client_id=client.oauth_client_id, client_id=client.oauth_client_id,
state="teststate", state="teststate",
redirect_uri="http://localhost", redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}",
response_type="id_token", # id_token flow response_type="id_token", # id_token flow
), ),
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
@ -378,7 +379,7 @@ def test_authorize_id_token_flow(flask_client):
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
o = urlparse(r.location) o = urlparse(r.location)
assert o.netloc == "localhost" assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0]
assert not o.fragment assert not o.fragment
assert o.query assert o.query
@ -414,7 +415,7 @@ def test_authorize_token_id_token_flow(flask_client):
"oauth.authorize", "oauth.authorize",
client_id=client.oauth_client_id, client_id=client.oauth_client_id,
state="teststate", state="teststate",
redirect_uri="http://localhost", redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}",
response_type="id_token token", # id_token,token flow response_type="id_token token", # id_token,token flow
), ),
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
@ -427,7 +428,7 @@ def test_authorize_token_id_token_flow(flask_client):
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
o = urlparse(r.location) o = urlparse(r.location)
assert o.netloc == "localhost" assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0]
assert o.fragment assert o.fragment
assert not o.query assert not o.query
@ -504,7 +505,7 @@ def test_authorize_code_id_token_flow(flask_client):
"oauth.authorize", "oauth.authorize",
client_id=client.oauth_client_id, client_id=client.oauth_client_id,
state="teststate", state="teststate",
redirect_uri="http://localhost", redirect_uri=f"https://{ALLOWED_REDIRECT_DOMAINS[0]}",
response_type="id_token code", # id_token,code flow response_type="id_token code", # id_token,code flow
), ),
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"}, data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
@ -517,7 +518,7 @@ def test_authorize_code_id_token_flow(flask_client):
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
o = urlparse(r.location) o = urlparse(r.location)
assert o.netloc == "localhost" assert o.netloc == ALLOWED_REDIRECT_DOMAINS[0]
assert not o.fragment assert not o.fragment
assert o.query assert o.query
@ -642,10 +643,7 @@ def test_authorize_page_invalid_client_id(flask_client):
) )
assert r.status_code == 302 assert r.status_code == 302
assert ( assert r.location == url_for("auth.login")
r.location
== "http://localhost?error=invalid_client_id&client_id=invalid_client_id"
)
def test_authorize_page_http_not_allowed(flask_client): def test_authorize_page_http_not_allowed(flask_client):

View file

@ -1,3 +1,7 @@
from typing import List
import pytest
from app.config import ALLOWED_REDIRECT_DOMAINS from app.config import ALLOWED_REDIRECT_DOMAINS
from app.utils import random_string, random_words, sanitize_next_url from app.utils import random_string, random_words, sanitize_next_url
@ -12,22 +16,27 @@ def test_random_string():
assert len(s) > 0 assert len(s) > 0
def test_sanitize_url(): def generate_sanitize_url_cases() -> List:
cases = [ cases = [
{"url": None, "expected": None}, [None, None],
{"url": "", "expected": None}, ["", None],
{"url": "http://unknown.domain", "expected": None}, ["http://badhttp.com", None],
{"url": "https://badzone.org", "expected": None}, ["https://badhttps.com", None],
{"url": "/", "expected": "/"}, ["/", "/"],
{"url": "/auth", "expected": "/auth"}, ["/auth", "/auth"],
{"url": "/some/path", "expected": "/some/path"}, ["/some/path", "/some/path"],
["//somewhere.net", None],
] ]
for domain in ALLOWED_REDIRECT_DOMAINS: for domain in ALLOWED_REDIRECT_DOMAINS:
cases.append({"url": f"http://{domain}", "expected": f"http://{domain}"}) cases.append([f"http://{domain}", f"http://{domain}"])
cases.append({"url": f"https://{domain}", "expected": f"https://{domain}"}) cases.append([f"https://{domain}", f"https://{domain}"])
cases.append({"url": domain, "expected": None}) cases.append([f"https://{domain}/sub", f"https://{domain}/sub"])
cases.append([domain, None])
cases.append([f"//{domain}", f"//{domain}"])
return cases
for case in cases:
res = sanitize_next_url(case["url"]) @pytest.mark.parametrize("url,expected", generate_sanitize_url_cases())
assert res == case["expected"] def test_sanitize_url(url, expected):
sanitized = sanitize_next_url(url)
assert expected == sanitized