mirror of
https://github.com/pretix/pretix.git
synced 2025-12-05 21:32:28 +00:00
Compare commits
3 Commits
ocm-cancel
...
oidc-pkce
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a5beda77f8 | ||
|
|
ef7f212063 | ||
|
|
26a5eab47f |
@@ -148,7 +148,7 @@ def oidc_validate_and_complete_config(config):
|
||||
return config
|
||||
|
||||
|
||||
def oidc_authorize_url(provider, state, redirect_uri):
|
||||
def oidc_authorize_url(provider, state, redirect_uri, pkce_code_verifier):
|
||||
endpoint = provider.configuration['provider_config']['authorization_endpoint']
|
||||
params = {
|
||||
# https://datatracker.ietf.org/doc/html/rfc6749#section-4.1.1
|
||||
@@ -163,10 +163,14 @@ def oidc_authorize_url(provider, state, redirect_uri):
|
||||
if "query_parameters" in provider.configuration and provider.configuration["query_parameters"]:
|
||||
params.update(parse_qsl(provider.configuration["query_parameters"]))
|
||||
|
||||
if pkce_code_verifier and "S256" in provider.configuration['provider_config'].get('code_challenge_methods_supported', []):
|
||||
params["code_challenge"] = base64.urlsafe_b64encode(hashlib.sha256(pkce_code_verifier.encode()).digest()).decode().rstrip("=")
|
||||
params["code_challenge_method"] = "S256"
|
||||
|
||||
return endpoint + '?' + urlencode(params)
|
||||
|
||||
|
||||
def oidc_validate_authorization(provider, code, redirect_uri):
|
||||
def oidc_validate_authorization(provider, code, redirect_uri, pkce_code_verifier):
|
||||
endpoint = provider.configuration['provider_config']['token_endpoint']
|
||||
|
||||
# Wall of shame and RFC ignorant IDPs
|
||||
@@ -188,6 +192,9 @@ def oidc_validate_authorization(provider, code, redirect_uri):
|
||||
'redirect_uri': redirect_uri,
|
||||
}
|
||||
|
||||
if pkce_code_verifier and "S256" in provider.configuration['provider_config'].get('code_challenge_methods_supported', []):
|
||||
params["code_verifier"] = pkce_code_verifier
|
||||
|
||||
if token_endpoint_auth_method == 'client_secret_post':
|
||||
params['client_id'] = provider.configuration['client_id']
|
||||
params['client_secret'] = provider.configuration['client_secret']
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
# Generated by Django 4.2.17 on 2025-02-07 16:42
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("pretixbase", "0276_item_hidden_if_item_available_mode"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="customerssoclient",
|
||||
name="require_pkce",
|
||||
field=models.BooleanField(default=False),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="customerssogrant",
|
||||
name="code_challenge",
|
||||
field=models.TextField(null=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="customerssogrant",
|
||||
name="code_challenge_method",
|
||||
field=models.CharField(max_length=255, null=True),
|
||||
),
|
||||
]
|
||||
@@ -416,6 +416,10 @@ class CustomerSSOClient(LoggedModel):
|
||||
authorization_grant_type = models.CharField(
|
||||
max_length=32, choices=GRANT_TYPES, verbose_name=_("Grant type"), default=GRANT_AUTHORIZATION_CODE,
|
||||
)
|
||||
require_pkce = models.BooleanField(
|
||||
verbose_name=_("Require PKCE extension"),
|
||||
default=False,
|
||||
)
|
||||
redirect_uris = models.TextField(
|
||||
blank=False,
|
||||
verbose_name=_("Redirection URIs"),
|
||||
@@ -481,6 +485,8 @@ class CustomerSSOGrant(models.Model):
|
||||
expires = models.DateTimeField()
|
||||
redirect_uri = models.TextField()
|
||||
scope = models.TextField(blank=True)
|
||||
code_challenge = models.TextField(blank=True, null=True)
|
||||
code_challenge_method = models.CharField(max_length=255, blank=True, null=True)
|
||||
|
||||
|
||||
class CustomerSSOAccessToken(models.Model):
|
||||
|
||||
@@ -1113,7 +1113,7 @@ class SSOClientForm(I18nModelForm):
|
||||
class Meta:
|
||||
model = CustomerSSOClient
|
||||
fields = ['is_active', 'name', 'client_id', 'client_type', 'authorization_grant_type', 'redirect_uris',
|
||||
'allowed_scopes']
|
||||
'allowed_scopes', 'require_pkce']
|
||||
widgets = {
|
||||
'authorization_grant_type': forms.RadioSelect,
|
||||
'client_type': forms.RadioSelect,
|
||||
|
||||
@@ -676,6 +676,8 @@ class SSOLoginView(RedirectBackMixin, View):
|
||||
popup_origin = None
|
||||
|
||||
nonce = get_random_string(32)
|
||||
pkce_code_verifier = get_random_string(64)
|
||||
request.session[f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier'] = pkce_code_verifier
|
||||
request.session[f'pretix_customerauth_{self.provider.pk}_nonce'] = nonce
|
||||
request.session[f'pretix_customerauth_{self.provider.pk}_popup_origin'] = popup_origin
|
||||
request.session[f'pretix_customerauth_{self.provider.pk}_cross_domain_requested'] = self.request.GET.get("request_cross_domain_customer_auth") == "true"
|
||||
@@ -684,7 +686,7 @@ class SSOLoginView(RedirectBackMixin, View):
|
||||
})
|
||||
|
||||
if self.provider.method == "oidc":
|
||||
return redirect_to_url(oidc_authorize_url(self.provider, f'{nonce}%{next_url}', redirect_uri))
|
||||
return redirect_to_url(oidc_authorize_url(self.provider, f'{nonce}%{next_url}', redirect_uri, pkce_code_verifier))
|
||||
else:
|
||||
raise Http404("Unknown SSO method.")
|
||||
|
||||
@@ -718,6 +720,7 @@ class SSOLoginReturnView(RedirectBackMixin, View):
|
||||
)
|
||||
return HttpResponseRedirect(redirect_to)
|
||||
r = super().dispatch(request, *args, **kwargs)
|
||||
request.session.pop(f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier', None)
|
||||
request.session.pop(f'pretix_customerauth_{self.provider.pk}_nonce', None)
|
||||
request.session.pop(f'pretix_customerauth_{self.provider.pk}_popup_origin', None)
|
||||
request.session.pop(f'pretix_customerauth_{self.provider.pk}_cross_domain_requested', None)
|
||||
@@ -763,6 +766,7 @@ class SSOLoginReturnView(RedirectBackMixin, View):
|
||||
self.provider,
|
||||
request.GET.get('code'),
|
||||
redirect_uri,
|
||||
request.session.get(f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier'),
|
||||
)
|
||||
except ValidationError as e:
|
||||
for msg in e:
|
||||
|
||||
@@ -72,6 +72,9 @@ We currently do not implement the following optional parts of the spec:
|
||||
We also implement the Discovery extension (without issuer discovery)
|
||||
as per https://openid.net/specs/openid-connect-discovery-1_0.html
|
||||
|
||||
We also implement the PKCE extension for OAuth:
|
||||
https://www.rfc-editor.org/rfc/rfc7636
|
||||
|
||||
The implementation passed the certification tests against the following profiles, but we did not
|
||||
acquire formal certification:
|
||||
|
||||
@@ -136,19 +139,22 @@ class AuthorizeView(View):
|
||||
self._construct_redirect_uri(redirect_uri, response_mode, qs)
|
||||
)
|
||||
|
||||
def _require_login(self, request, client, scope, redirect_uri, response_type, response_mode, state, nonce):
|
||||
def _require_login(self, request, client, scope, redirect_uri, response_type, response_mode, state, nonce,
|
||||
code_challenge, code_challenge_method):
|
||||
form = AuthenticationForm(data=request.POST if "login-email" in request.POST else None, request=request,
|
||||
prefix="login")
|
||||
if "login-email" in request.POST and form.is_valid():
|
||||
customer_login(request, form.get_customer())
|
||||
return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, form.get_customer())
|
||||
return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce,
|
||||
code_challenge, code_challenge_method, form.get_customer())
|
||||
else:
|
||||
return render(request, 'pretixpresale/organizers/customer_login.html', {
|
||||
'providers': [],
|
||||
'form': form,
|
||||
})
|
||||
|
||||
def _success(self, client, scope, redirect_uri, response_type, response_mode, state, nonce, customer):
|
||||
def _success(self, client, scope, redirect_uri, response_type, response_mode, state, nonce, code_challenge,
|
||||
code_challenge_method, customer):
|
||||
response_type = response_type.split(' ')
|
||||
qs = {}
|
||||
id_token_kwargs = {}
|
||||
@@ -162,6 +168,8 @@ class AuthorizeView(View):
|
||||
expires=now() + timedelta(minutes=10),
|
||||
auth_time=get_customer_auth_time(self.request),
|
||||
nonce=nonce,
|
||||
code_challenge=code_challenge,
|
||||
code_challenge_method=code_challenge_method,
|
||||
)
|
||||
qs['code'] = grant.code
|
||||
id_token_kwargs['with_code'] = grant.code
|
||||
@@ -209,6 +217,8 @@ class AuthorizeView(View):
|
||||
prompt = request_data.get("prompt")
|
||||
response_type = request_data.get("response_type")
|
||||
scope = request_data.get("scope", "").split(" ")
|
||||
code_challenge = request_data.get("code_challenge")
|
||||
code_challenge_method = request_data.get("code_challenge_method")
|
||||
|
||||
if not client_id:
|
||||
return self._final_error("invalid_request", "client_id missing")
|
||||
@@ -247,6 +257,16 @@ class AuthorizeView(View):
|
||||
self._redirect_error("invalid_request", "id_token_hint currently not supported by this server",
|
||||
redirect_uri, response_mode, state)
|
||||
|
||||
if code_challenge and code_challenge_method != "S256":
|
||||
# "Clients re permitted to use "plain" only if they cannot support "S256" for some technical reason and
|
||||
# know via out-of-band configuration that the S256 MUST be implemented, plain is not mandatory."
|
||||
return self._redirect_error("invalid_request", "code_challenge transform algorithm not supported",
|
||||
redirect_uri, response_mode, state)
|
||||
|
||||
if client.require_pkce and not code_challenge:
|
||||
return self._redirect_error("invalid_request", "code_challenge (PKCE) required",
|
||||
redirect_uri, response_mode, state)
|
||||
|
||||
has_valid_session = bool(request.customer)
|
||||
if has_valid_session and max_age:
|
||||
try:
|
||||
@@ -261,9 +281,11 @@ class AuthorizeView(View):
|
||||
has_valid_session = False
|
||||
|
||||
if has_valid_session:
|
||||
return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, request.customer)
|
||||
return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, code_challenge,
|
||||
code_challenge_method, request.customer)
|
||||
else:
|
||||
return self._require_login(request, client, scope, redirect_uri, response_type, response_mode, state, nonce)
|
||||
return self._require_login(request, client, scope, redirect_uri, response_type, response_mode, state, nonce,
|
||||
code_challenge, code_challenge_method)
|
||||
|
||||
|
||||
class TokenView(View):
|
||||
@@ -361,6 +383,24 @@ class TokenView(View):
|
||||
"error_description": "Mismatch of redirect_uri"
|
||||
}, status=400)
|
||||
|
||||
if grant.code_challenge:
|
||||
if not request.POST.get("code_verifier"):
|
||||
return JsonResponse({
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Missing of code_verifier"
|
||||
}, status=400)
|
||||
|
||||
if grant.code_challenge_method == "S256":
|
||||
expected_challenge = base64.urlsafe_b64encode(hashlib.sha256(request.POST["code_verifier"].encode()).digest()).decode().rstrip("=")
|
||||
print(grant.code_challenge, expected_challenge)
|
||||
if expected_challenge != grant.code_challenge:
|
||||
return JsonResponse({
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Mismatch of code_verifier with code_challenge"
|
||||
}, status=400)
|
||||
else:
|
||||
raise ValueError("Unsupported code_challenge_method in database")
|
||||
|
||||
with transaction.atomic():
|
||||
token = self.client.access_tokens.create(
|
||||
customer=grant.customer,
|
||||
@@ -502,6 +542,7 @@ class ConfigurationView(View):
|
||||
'token_endpoint_auth_methods_supported': [
|
||||
'client_secret_post', 'client_secret_basic'
|
||||
],
|
||||
'code_challenge_methods_supported': ['S256'],
|
||||
'claims_supported': [
|
||||
'iss',
|
||||
'aud',
|
||||
|
||||
@@ -236,7 +236,8 @@ def provider(organizer):
|
||||
"response_modes_supported": ["query"],
|
||||
"grant_types_supported": ["authorization_code"],
|
||||
"scopes_supported": ["openid", "email", "profile"],
|
||||
"claims_supported": ["email", "sub"]
|
||||
"claims_supported": ["email", "sub"],
|
||||
"code_challenge_methods_supported": ["plain", "S256"],
|
||||
}
|
||||
}
|
||||
)
|
||||
@@ -244,6 +245,21 @@ def provider(organizer):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_url(provider):
|
||||
assert (
|
||||
"https://example.com/authorize?"
|
||||
"response_type=code&"
|
||||
"client_id=abc123&"
|
||||
"scope=openid+email+profile&"
|
||||
"state=state_val&"
|
||||
"redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar&"
|
||||
"code_challenge=S1ZnvzwMZHrWOO62nENdJ6jhODhf7VfyZFBIXQyrTKo&"
|
||||
"code_challenge_method=S256"
|
||||
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar", "pkce_value")
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_url_no_pkce(provider):
|
||||
del provider.configuration["provider_config"]["code_challenge_methods_supported"]
|
||||
assert (
|
||||
"https://example.com/authorize?"
|
||||
"response_type=code&"
|
||||
@@ -251,7 +267,7 @@ def test_authorize_url(provider):
|
||||
"scope=openid+email+profile&"
|
||||
"state=state_val&"
|
||||
"redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar"
|
||||
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar")
|
||||
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar", "pkce_value")
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -264,7 +280,7 @@ def test_validate_authorization_invalid(provider):
|
||||
status=400,
|
||||
)
|
||||
with pytest.raises(ValidationError):
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -281,6 +297,7 @@ def test_validate_authorization_userinfo_invalid(provider):
|
||||
"grant_type": "authorization_code",
|
||||
"code": "code_received",
|
||||
"redirect_uri": "https://redirect?foo=bar",
|
||||
"code_verifier": "pkce_value",
|
||||
})
|
||||
],
|
||||
)
|
||||
@@ -296,7 +313,7 @@ def test_validate_authorization_userinfo_invalid(provider):
|
||||
],
|
||||
)
|
||||
with pytest.raises(ValidationError) as e:
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")
|
||||
assert 'could not fetch' in str(e.value)
|
||||
|
||||
|
||||
@@ -314,6 +331,7 @@ def test_validate_authorization_valid(provider):
|
||||
"grant_type": "authorization_code",
|
||||
"code": "code_received",
|
||||
"redirect_uri": "https://redirect?foo=bar",
|
||||
"code_verifier": "pkce_value",
|
||||
})
|
||||
],
|
||||
)
|
||||
@@ -328,4 +346,4 @@ def test_validate_authorization_valid(provider):
|
||||
matchers.header_matcher({"Authorization": "Bearer test_access_token"})
|
||||
],
|
||||
)
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
|
||||
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")
|
||||
|
||||
@@ -208,6 +208,37 @@ def test_authorize_with_prompt_none(env, client, ssoclient):
|
||||
assert re.match(r'https://example.net\?code=([a-z0-9A-Z]{64})&state=STATE', r.headers['Location'])
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_with_invalid_pkce_method(env, client, ssoclient):
|
||||
url = f'/bigevents/oauth2/v1/authorize?' \
|
||||
f'client_id={ssoclient[0].client_id}&' \
|
||||
f'redirect_uri=https://example.net&' \
|
||||
f'response_type=code&state=STATE&scope=openid+profile&' \
|
||||
f'code_challenge=pkce_value&code_challenge_method=plain'
|
||||
r = client.get(url)
|
||||
assert r.status_code == 302
|
||||
assert r.headers['Location'] == 'https://example.net?' \
|
||||
'error=invalid_request&' \
|
||||
'error_description=code_challenge+transform+algorithm+not+supported&' \
|
||||
'state=STATE'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_with_missing_pkce_if_required(env, client, ssoclient):
|
||||
ssoclient[0].require_pkce = True
|
||||
ssoclient[0].save()
|
||||
url = f'/bigevents/oauth2/v1/authorize?' \
|
||||
f'client_id={ssoclient[0].client_id}&' \
|
||||
f'redirect_uri=https://example.net&' \
|
||||
f'response_type=code&state=STATE&scope=openid+profile'
|
||||
r = client.get(url)
|
||||
assert r.status_code == 302
|
||||
assert r.headers['Location'] == 'https://example.net?' \
|
||||
'error=invalid_request&' \
|
||||
'error_description=code_challenge+%28PKCE%29+required&' \
|
||||
'state=STATE'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_authorize_require_login_if_prompt_requires_it_or_is_expired(env, client, ssoclient):
|
||||
with freeze_time("2021-04-10T11:00:00+02:00"):
|
||||
@@ -286,7 +317,7 @@ def test_token_require_client_id(env, client, ssoclient):
|
||||
assert b'unsupported_grant_type' in r.content
|
||||
|
||||
|
||||
def _authorization_step(client, ssoclient):
|
||||
def _authorization_step(client, ssoclient, code_challenge=None):
|
||||
r = client.post('/bigevents/account/login', {
|
||||
'email': 'john@example.org',
|
||||
'password': 'foo',
|
||||
@@ -299,6 +330,8 @@ def _authorization_step(client, ssoclient):
|
||||
f'client_id={ssoclient[0].client_id}&' \
|
||||
f'redirect_uri=https://example.net&' \
|
||||
f'response_type=code&state=STATE&scope=openid+profile+email+phone'
|
||||
if code_challenge:
|
||||
url += f'&code_challenge={code_challenge}&code_challenge_method=S256'
|
||||
r = client.get(url)
|
||||
assert r.status_code == 302
|
||||
m = re.match(r'https://example.net\?code=([a-z0-9A-Z]{64})&state=STATE', r.headers['Location'])
|
||||
@@ -373,6 +406,55 @@ def test_token_success(env, client, ssoclient):
|
||||
CustomerSSOAccessToken.objects.get(token=d['access_token']).expires < now()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_token_pkce_required_if_used_in_authorization(env, client, ssoclient):
|
||||
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
|
||||
|
||||
r = client.post('/bigevents/oauth2/v1/token', {
|
||||
'grant_type': 'authorization_code',
|
||||
'code': code,
|
||||
'redirect_uri': 'https://example.net',
|
||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
|
||||
assert r.status_code == 400
|
||||
d = json.loads(r.content)
|
||||
assert d['error'] == 'invalid_grant'
|
||||
assert d['error_description'] == 'Missing of code_verifier'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_token_pkce_incorrect(env, client, ssoclient):
|
||||
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
|
||||
|
||||
r = client.post('/bigevents/oauth2/v1/token', {
|
||||
'grant_type': 'authorization_code',
|
||||
'code': code,
|
||||
'redirect_uri': 'https://example.net',
|
||||
'code_verifier': "WRONG",
|
||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
|
||||
assert r.status_code == 400
|
||||
d = json.loads(r.content)
|
||||
assert d['error'] == 'invalid_grant'
|
||||
assert d['error_description'] == 'Mismatch of code_verifier with code_challenge'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_token_success_pkce(env, client, ssoclient):
|
||||
# this is the sample from the actual RFC
|
||||
code_verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk"
|
||||
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
|
||||
|
||||
r = client.post('/bigevents/oauth2/v1/token', {
|
||||
'grant_type': 'authorization_code',
|
||||
'code': code,
|
||||
'redirect_uri': 'https://example.net',
|
||||
'code_verifier': code_verifier,
|
||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
|
||||
print(r.content)
|
||||
assert r.status_code == 200
|
||||
d = json.loads(r.content)
|
||||
assert d['access_token']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_scope_enforcement(env, client, ssoclient):
|
||||
ssoclient[0].allowed_scopes = ['openid', 'profile']
|
||||
|
||||
Reference in New Issue
Block a user