mirror of
https://github.com/pretix/pretix.git
synced 2026-05-03 14:54:04 +00:00
OIDC: Implement PKCE in OP and RP
This commit is contained in:
@@ -236,7 +236,8 @@ def provider(organizer):
|
||||
"response_modes_supported": ["query"],
|
||||
"grant_types_supported": ["authorization_code"],
|
||||
"scopes_supported": ["openid", "email", "profile"],
|
||||
"claims_supported": ["email", "sub"]
|
||||
"claims_supported": ["email", "sub"],
|
||||
"code_challenge_methods_supported": ["plain", "S256"],
|
||||
}
|
||||
}
|
||||
)
|
||||
@@ -244,6 +245,21 @@ def provider(organizer):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_url(provider):
|
||||
assert (
|
||||
"https://example.com/authorize?"
|
||||
"response_type=code&"
|
||||
"client_id=abc123&"
|
||||
"scope=openid+email+profile&"
|
||||
"state=state_val&"
|
||||
"redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar&"
|
||||
"code_challenge=S1ZnvzwMZHrWOO62nENdJ6jhODhf7VfyZFBIXQyrTKo&"
|
||||
"code_challenge_method=S256"
|
||||
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar", "pkce_value")
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_url_no_pkce(provider):
|
||||
del provider.configuration["provider_config"]["code_challenge_methods_supported"]
|
||||
assert (
|
||||
"https://example.com/authorize?"
|
||||
"response_type=code&"
|
||||
@@ -251,7 +267,7 @@ def test_authorize_url(provider):
|
||||
"scope=openid+email+profile&"
|
||||
"state=state_val&"
|
||||
"redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar"
|
||||
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar")
|
||||
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar", "pkce_value")
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -264,7 +280,7 @@ def test_validate_authorization_invalid(provider):
|
||||
status=400,
|
||||
)
|
||||
with pytest.raises(ValidationError):
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -281,6 +297,7 @@ def test_validate_authorization_userinfo_invalid(provider):
|
||||
"grant_type": "authorization_code",
|
||||
"code": "code_received",
|
||||
"redirect_uri": "https://redirect?foo=bar",
|
||||
"code_verifier": "pkce_value",
|
||||
})
|
||||
],
|
||||
)
|
||||
@@ -296,7 +313,7 @@ def test_validate_authorization_userinfo_invalid(provider):
|
||||
],
|
||||
)
|
||||
with pytest.raises(ValidationError) as e:
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")
|
||||
assert 'could not fetch' in str(e.value)
|
||||
|
||||
|
||||
@@ -314,6 +331,7 @@ def test_validate_authorization_valid(provider):
|
||||
"grant_type": "authorization_code",
|
||||
"code": "code_received",
|
||||
"redirect_uri": "https://redirect?foo=bar",
|
||||
"code_verifier": "pkce_value",
|
||||
})
|
||||
],
|
||||
)
|
||||
@@ -328,4 +346,4 @@ def test_validate_authorization_valid(provider):
|
||||
matchers.header_matcher({"Authorization": "Bearer test_access_token"})
|
||||
],
|
||||
)
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")
|
||||
|
||||
@@ -208,6 +208,37 @@ def test_authorize_with_prompt_none(env, client, ssoclient):
|
||||
assert re.match(r'https://example.net\?code=([a-z0-9A-Z]{64})&state=STATE', r.headers['Location'])
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_with_invalid_pkce_method(env, client, ssoclient):
|
||||
url = f'/bigevents/oauth2/v1/authorize?' \
|
||||
f'client_id={ssoclient[0].client_id}&' \
|
||||
f'redirect_uri=https://example.net&' \
|
||||
f'response_type=code&state=STATE&scope=openid+profile&' \
|
||||
f'code_challenge=pkce_value&code_challenge_method=plain'
|
||||
r = client.get(url)
|
||||
assert r.status_code == 302
|
||||
assert r.headers['Location'] == 'https://example.net?' \
|
||||
'error=invalid_request&' \
|
||||
'error_description=code_challenge+transform+algorithm+not+supported&' \
|
||||
'state=STATE'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_with_missing_pkce_if_required(env, client, ssoclient):
|
||||
ssoclient[0].require_pkce = True
|
||||
ssoclient[0].save()
|
||||
url = f'/bigevents/oauth2/v1/authorize?' \
|
||||
f'client_id={ssoclient[0].client_id}&' \
|
||||
f'redirect_uri=https://example.net&' \
|
||||
f'response_type=code&state=STATE&scope=openid+profile'
|
||||
r = client.get(url)
|
||||
assert r.status_code == 302
|
||||
assert r.headers['Location'] == 'https://example.net?' \
|
||||
'error=invalid_request&' \
|
||||
'error_description=code_challenge+%28PKCE%29+required&' \
|
||||
'state=STATE'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_require_login_if_prompt_requires_it_or_is_expired(env, client, ssoclient):
|
||||
with freeze_time("2021-04-10T11:00:00+02:00"):
|
||||
@@ -286,7 +317,7 @@ def test_token_require_client_id(env, client, ssoclient):
|
||||
assert b'unsupported_grant_type' in r.content
|
||||
|
||||
|
||||
def _authorization_step(client, ssoclient):
|
||||
def _authorization_step(client, ssoclient, code_challenge=None):
|
||||
r = client.post('/bigevents/account/login', {
|
||||
'email': 'john@example.org',
|
||||
'password': 'foo',
|
||||
@@ -299,6 +330,8 @@ def _authorization_step(client, ssoclient):
|
||||
f'client_id={ssoclient[0].client_id}&' \
|
||||
f'redirect_uri=https://example.net&' \
|
||||
f'response_type=code&state=STATE&scope=openid+profile+email+phone'
|
||||
if code_challenge:
|
||||
url += f'&code_challenge={code_challenge}&code_challenge_method=S256'
|
||||
r = client.get(url)
|
||||
assert r.status_code == 302
|
||||
m = re.match(r'https://example.net\?code=([a-z0-9A-Z]{64})&state=STATE', r.headers['Location'])
|
||||
@@ -373,6 +406,55 @@ def test_token_success(env, client, ssoclient):
|
||||
CustomerSSOAccessToken.objects.get(token=d['access_token']).expires < now()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_token_pkce_required_if_used_in_authorization(env, client, ssoclient):
|
||||
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
|
||||
|
||||
r = client.post('/bigevents/oauth2/v1/token', {
|
||||
'grant_type': 'authorization_code',
|
||||
'code': code,
|
||||
'redirect_uri': 'https://example.net',
|
||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
|
||||
assert r.status_code == 400
|
||||
d = json.loads(r.content)
|
||||
assert d['error'] == 'invalid_grant'
|
||||
assert d['error_description'] == 'Missing of code_verifier'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_token_pkce_incorrect(env, client, ssoclient):
|
||||
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
|
||||
|
||||
r = client.post('/bigevents/oauth2/v1/token', {
|
||||
'grant_type': 'authorization_code',
|
||||
'code': code,
|
||||
'redirect_uri': 'https://example.net',
|
||||
'code_verifier': "WRONG",
|
||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
|
||||
assert r.status_code == 400
|
||||
d = json.loads(r.content)
|
||||
assert d['error'] == 'invalid_grant'
|
||||
assert d['error_description'] == 'Mismatch of code_verifier with code_challenge'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_token_success_pkce(env, client, ssoclient):
|
||||
# this is the sample from the actual RFC
|
||||
code_verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk"
|
||||
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
|
||||
|
||||
r = client.post('/bigevents/oauth2/v1/token', {
|
||||
'grant_type': 'authorization_code',
|
||||
'code': code,
|
||||
'redirect_uri': 'https://example.net',
|
||||
'code_verifier': code_verifier,
|
||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
|
||||
print(r.content)
|
||||
assert r.status_code == 200
|
||||
d = json.loads(r.content)
|
||||
assert d['access_token']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_scope_enforcement(env, client, ssoclient):
|
||||
ssoclient[0].allowed_scopes = ['openid', 'profile']
|
||||
|
||||
Reference in New Issue
Block a user