diff --git a/src/pretix/base/customersso/oidc.py b/src/pretix/base/customersso/oidc.py index ce1c6f786c..71e64e1d6c 100644 --- a/src/pretix/base/customersso/oidc.py +++ b/src/pretix/base/customersso/oidc.py @@ -148,7 +148,7 @@ def oidc_validate_and_complete_config(config): return config -def oidc_authorize_url(provider, state, redirect_uri): +def oidc_authorize_url(provider, state, redirect_uri, pkce_code_verifier): endpoint = provider.configuration['provider_config']['authorization_endpoint'] params = { # https://datatracker.ietf.org/doc/html/rfc6749#section-4.1.1 @@ -163,10 +163,14 @@ def oidc_authorize_url(provider, state, redirect_uri): if "query_parameters" in provider.configuration and provider.configuration["query_parameters"]: params.update(parse_qsl(provider.configuration["query_parameters"])) + if pkce_code_verifier and "S256" in provider.configuration['provider_config'].get('code_challenge_methods_supported', []): + params["code_challenge"] = base64.urlsafe_b64encode(hashlib.sha256(pkce_code_verifier.encode()).digest()).decode().rstrip("=") + params["code_challenge_method"] = "S256" + return endpoint + '?' + urlencode(params) -def oidc_validate_authorization(provider, code, redirect_uri): +def oidc_validate_authorization(provider, code, redirect_uri, pkce_code_verifier): endpoint = provider.configuration['provider_config']['token_endpoint'] # Wall of shame and RFC ignorant IDPs @@ -188,6 +192,9 @@ def oidc_validate_authorization(provider, code, redirect_uri): 'redirect_uri': redirect_uri, } + if pkce_code_verifier and "S256" in provider.configuration['provider_config'].get('code_challenge_methods_supported', []): + params["code_verifier"] = pkce_code_verifier + if token_endpoint_auth_method == 'client_secret_post': params['client_id'] = provider.configuration['client_id'] params['client_secret'] = provider.configuration['client_secret'] diff --git a/src/pretix/base/migrations/0277_customerssoclient_require_pkce_and_more.py b/src/pretix/base/migrations/0277_customerssoclient_require_pkce_and_more.py new file mode 100644 index 0000000000..21ffe8988d --- /dev/null +++ b/src/pretix/base/migrations/0277_customerssoclient_require_pkce_and_more.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.17 on 2025-02-07 16:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("pretixbase", "0276_item_hidden_if_item_available_mode"), + ] + + operations = [ + migrations.AddField( + model_name="customerssoclient", + name="require_pkce", + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name="customerssogrant", + name="code_challenge", + field=models.TextField(null=True), + ), + migrations.AddField( + model_name="customerssogrant", + name="code_challenge_method", + field=models.CharField(max_length=255, null=True), + ), + ] diff --git a/src/pretix/base/models/customers.py b/src/pretix/base/models/customers.py index b4b602344c..019f6fa87a 100644 --- a/src/pretix/base/models/customers.py +++ b/src/pretix/base/models/customers.py @@ -416,6 +416,10 @@ class CustomerSSOClient(LoggedModel): authorization_grant_type = models.CharField( max_length=32, choices=GRANT_TYPES, verbose_name=_("Grant type"), default=GRANT_AUTHORIZATION_CODE, ) + require_pkce = models.BooleanField( + verbose_name=_("Require PKCE extension"), + default=False, + ) redirect_uris = models.TextField( blank=False, verbose_name=_("Redirection URIs"), @@ -481,6 +485,8 @@ class CustomerSSOGrant(models.Model): expires = models.DateTimeField() redirect_uri = models.TextField() scope = models.TextField(blank=True) + code_challenge = models.TextField(blank=True, null=True) + code_challenge_method = models.CharField(max_length=255, blank=True, null=True) class CustomerSSOAccessToken(models.Model): diff --git a/src/pretix/control/forms/organizer.py b/src/pretix/control/forms/organizer.py index cf2f24f8b5..2cef1c657d 100644 --- a/src/pretix/control/forms/organizer.py +++ b/src/pretix/control/forms/organizer.py @@ -1113,7 +1113,7 @@ class SSOClientForm(I18nModelForm): class Meta: model = CustomerSSOClient fields = ['is_active', 'name', 'client_id', 'client_type', 'authorization_grant_type', 'redirect_uris', - 'allowed_scopes'] + 'allowed_scopes', 'require_pkce'] widgets = { 'authorization_grant_type': forms.RadioSelect, 'client_type': forms.RadioSelect, diff --git a/src/pretix/presale/views/customer.py b/src/pretix/presale/views/customer.py index 693128b1cb..5d8fd42ab3 100644 --- a/src/pretix/presale/views/customer.py +++ b/src/pretix/presale/views/customer.py @@ -676,6 +676,8 @@ class SSOLoginView(RedirectBackMixin, View): popup_origin = None nonce = get_random_string(32) + pkce_code_verifier = get_random_string(64) + request.session[f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier'] = pkce_code_verifier request.session[f'pretix_customerauth_{self.provider.pk}_nonce'] = nonce request.session[f'pretix_customerauth_{self.provider.pk}_popup_origin'] = popup_origin request.session[f'pretix_customerauth_{self.provider.pk}_cross_domain_requested'] = self.request.GET.get("request_cross_domain_customer_auth") == "true" @@ -684,7 +686,7 @@ class SSOLoginView(RedirectBackMixin, View): }) if self.provider.method == "oidc": - return redirect_to_url(oidc_authorize_url(self.provider, f'{nonce}%{next_url}', redirect_uri)) + return redirect_to_url(oidc_authorize_url(self.provider, f'{nonce}%{next_url}', redirect_uri, pkce_code_verifier)) else: raise Http404("Unknown SSO method.") @@ -718,6 +720,7 @@ class SSOLoginReturnView(RedirectBackMixin, View): ) return HttpResponseRedirect(redirect_to) r = super().dispatch(request, *args, **kwargs) + request.session.pop(f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier', None) request.session.pop(f'pretix_customerauth_{self.provider.pk}_nonce', None) request.session.pop(f'pretix_customerauth_{self.provider.pk}_popup_origin', None) request.session.pop(f'pretix_customerauth_{self.provider.pk}_cross_domain_requested', None) @@ -763,6 +766,7 @@ class SSOLoginReturnView(RedirectBackMixin, View): self.provider, request.GET.get('code'), redirect_uri, + request.session.get(f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier'), ) except ValidationError as e: for msg in e: diff --git a/src/pretix/presale/views/oidc_op.py b/src/pretix/presale/views/oidc_op.py index 6630fd074c..2c23360e7f 100644 --- a/src/pretix/presale/views/oidc_op.py +++ b/src/pretix/presale/views/oidc_op.py @@ -72,6 +72,9 @@ We currently do not implement the following optional parts of the spec: We also implement the Discovery extension (without issuer discovery) as per https://openid.net/specs/openid-connect-discovery-1_0.html +We also implement the PKCE extension for OAuth: +https://www.rfc-editor.org/rfc/rfc7636 + The implementation passed the certification tests against the following profiles, but we did not acquire formal certification: @@ -136,19 +139,22 @@ class AuthorizeView(View): self._construct_redirect_uri(redirect_uri, response_mode, qs) ) - def _require_login(self, request, client, scope, redirect_uri, response_type, response_mode, state, nonce): + def _require_login(self, request, client, scope, redirect_uri, response_type, response_mode, state, nonce, + code_challenge, code_challenge_method): form = AuthenticationForm(data=request.POST if "login-email" in request.POST else None, request=request, prefix="login") if "login-email" in request.POST and form.is_valid(): customer_login(request, form.get_customer()) - return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, form.get_customer()) + return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, + code_challenge, code_challenge_method, form.get_customer()) else: return render(request, 'pretixpresale/organizers/customer_login.html', { 'providers': [], 'form': form, }) - def _success(self, client, scope, redirect_uri, response_type, response_mode, state, nonce, customer): + def _success(self, client, scope, redirect_uri, response_type, response_mode, state, nonce, code_challenge, + code_challenge_method, customer): response_type = response_type.split(' ') qs = {} id_token_kwargs = {} @@ -162,6 +168,8 @@ class AuthorizeView(View): expires=now() + timedelta(minutes=10), auth_time=get_customer_auth_time(self.request), nonce=nonce, + code_challenge=code_challenge, + code_challenge_method=code_challenge_method, ) qs['code'] = grant.code id_token_kwargs['with_code'] = grant.code @@ -209,6 +217,8 @@ class AuthorizeView(View): prompt = request_data.get("prompt") response_type = request_data.get("response_type") scope = request_data.get("scope", "").split(" ") + code_challenge = request_data.get("code_challenge") + code_challenge_method = request_data.get("code_challenge_method") if not client_id: return self._final_error("invalid_request", "client_id missing") @@ -247,6 +257,16 @@ class AuthorizeView(View): return self._redirect_error("invalid_request", "id_token_hint currently not supported by this server", redirect_uri, response_mode, state) + if code_challenge and code_challenge_method != "S256": + # "Clients re permitted to use "plain" only if they cannot support "S256" for some technical reason and + # know via out-of-band configuration that the S256 MUST be implemented, plain is not mandatory." + return self._redirect_error("invalid_request", "code_challenge transform algorithm not supported", + redirect_uri, response_mode, state) + + if client.require_pkce and not code_challenge: + return self._redirect_error("invalid_request", "code_challenge (PKCE) required", + redirect_uri, response_mode, state) + has_valid_session = bool(request.customer) if has_valid_session and max_age: try: @@ -262,9 +282,11 @@ class AuthorizeView(View): has_valid_session = False if has_valid_session: - return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, request.customer) + return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, code_challenge, + code_challenge_method, request.customer) else: - return self._require_login(request, client, scope, redirect_uri, response_type, response_mode, state, nonce) + return self._require_login(request, client, scope, redirect_uri, response_type, response_mode, state, nonce, + code_challenge, code_challenge_method) class TokenView(View): @@ -362,6 +384,24 @@ class TokenView(View): "error_description": "Mismatch of redirect_uri" }, status=400) + if grant.code_challenge: + if not request.POST.get("code_verifier"): + return JsonResponse({ + "error": "invalid_grant", + "error_description": "Missing of code_verifier" + }, status=400) + + if grant.code_challenge_method == "S256": + expected_challenge = base64.urlsafe_b64encode(hashlib.sha256(request.POST["code_verifier"].encode()).digest()).decode().rstrip("=") + print(grant.code_challenge, expected_challenge) + if expected_challenge != grant.code_challenge: + return JsonResponse({ + "error": "invalid_grant", + "error_description": "Mismatch of code_verifier with code_challenge" + }, status=400) + else: + raise ValueError("Unsupported code_challenge_method in database") + with transaction.atomic(): token = self.client.access_tokens.create( customer=grant.customer, @@ -503,6 +543,7 @@ class ConfigurationView(View): 'token_endpoint_auth_methods_supported': [ 'client_secret_post', 'client_secret_basic' ], + 'code_challenge_methods_supported': ['S256'], 'claims_supported': [ 'iss', 'aud', diff --git a/src/tests/base/test_customer_oidc_rp.py b/src/tests/base/test_customer_oidc_rp.py index 0f979015d1..a0af129287 100644 --- a/src/tests/base/test_customer_oidc_rp.py +++ b/src/tests/base/test_customer_oidc_rp.py @@ -236,7 +236,8 @@ def provider(organizer): "response_modes_supported": ["query"], "grant_types_supported": ["authorization_code"], "scopes_supported": ["openid", "email", "profile"], - "claims_supported": ["email", "sub"] + "claims_supported": ["email", "sub"], + "code_challenge_methods_supported": ["plain", "S256"], } } ) @@ -244,6 +245,21 @@ def provider(organizer): @pytest.mark.django_db def test_authorize_url(provider): + assert ( + "https://example.com/authorize?" + "response_type=code&" + "client_id=abc123&" + "scope=openid+email+profile&" + "state=state_val&" + "redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar&" + "code_challenge=S1ZnvzwMZHrWOO62nENdJ6jhODhf7VfyZFBIXQyrTKo&" + "code_challenge_method=S256" + ) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar", "pkce_value") + + +@pytest.mark.django_db +def test_authorize_url_no_pkce(provider): + del provider.configuration["provider_config"]["code_challenge_methods_supported"] assert ( "https://example.com/authorize?" "response_type=code&" @@ -251,7 +267,7 @@ def test_authorize_url(provider): "scope=openid+email+profile&" "state=state_val&" "redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar" - ) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar") + ) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar", "pkce_value") @pytest.mark.django_db @@ -264,7 +280,7 @@ def test_validate_authorization_invalid(provider): status=400, ) with pytest.raises(ValidationError): - oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar") + oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value") @pytest.mark.django_db @@ -281,6 +297,7 @@ def test_validate_authorization_userinfo_invalid(provider): "grant_type": "authorization_code", "code": "code_received", "redirect_uri": "https://redirect?foo=bar", + "code_verifier": "pkce_value", }) ], ) @@ -296,7 +313,7 @@ def test_validate_authorization_userinfo_invalid(provider): ], ) with pytest.raises(ValidationError) as e: - oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar") + oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value") assert 'could not fetch' in str(e.value) @@ -314,6 +331,7 @@ def test_validate_authorization_valid(provider): "grant_type": "authorization_code", "code": "code_received", "redirect_uri": "https://redirect?foo=bar", + "code_verifier": "pkce_value", }) ], ) @@ -328,4 +346,4 @@ def test_validate_authorization_valid(provider): matchers.header_matcher({"Authorization": "Bearer test_access_token"}) ], ) - oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar") + oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value") diff --git a/src/tests/presale/test_oidc_op.py b/src/tests/presale/test_oidc_op.py index c097605999..161c16d555 100644 --- a/src/tests/presale/test_oidc_op.py +++ b/src/tests/presale/test_oidc_op.py @@ -208,6 +208,37 @@ def test_authorize_with_prompt_none(env, client, ssoclient): assert re.match(r'https://example.net\?code=([a-z0-9A-Z]{64})&state=STATE', r.headers['Location']) +@pytest.mark.django_db +def test_authorize_with_invalid_pkce_method(env, client, ssoclient): + url = f'/bigevents/oauth2/v1/authorize?' \ + f'client_id={ssoclient[0].client_id}&' \ + f'redirect_uri=https://example.net&' \ + f'response_type=code&state=STATE&scope=openid+profile&' \ + f'code_challenge=pkce_value&code_challenge_method=plain' + r = client.get(url) + assert r.status_code == 302 + assert r.headers['Location'] == 'https://example.net?' \ + 'error=invalid_request&' \ + 'error_description=code_challenge+transform+algorithm+not+supported&' \ + 'state=STATE' + + +@pytest.mark.django_db +def test_authorize_with_missing_pkce_if_required(env, client, ssoclient): + ssoclient[0].require_pkce = True + ssoclient[0].save() + url = f'/bigevents/oauth2/v1/authorize?' \ + f'client_id={ssoclient[0].client_id}&' \ + f'redirect_uri=https://example.net&' \ + f'response_type=code&state=STATE&scope=openid+profile' + r = client.get(url) + assert r.status_code == 302 + assert r.headers['Location'] == 'https://example.net?' \ + 'error=invalid_request&' \ + 'error_description=code_challenge+%28PKCE%29+required&' \ + 'state=STATE' + + @pytest.mark.django_db def test_authorize_require_login_if_prompt_requires_it_or_is_expired(env, client, ssoclient): with freeze_time("2021-04-10T11:00:00+02:00"): @@ -286,7 +317,7 @@ def test_token_require_client_id(env, client, ssoclient): assert b'unsupported_grant_type' in r.content -def _authorization_step(client, ssoclient): +def _authorization_step(client, ssoclient, code_challenge=None): r = client.post('/bigevents/account/login', { 'email': 'john@example.org', 'password': 'foo', @@ -299,6 +330,8 @@ def _authorization_step(client, ssoclient): f'client_id={ssoclient[0].client_id}&' \ f'redirect_uri=https://example.net&' \ f'response_type=code&state=STATE&scope=openid+profile+email+phone' + if code_challenge: + url += f'&code_challenge={code_challenge}&code_challenge_method=S256' r = client.get(url) assert r.status_code == 302 m = re.match(r'https://example.net\?code=([a-z0-9A-Z]{64})&state=STATE', r.headers['Location']) @@ -373,6 +406,55 @@ def test_token_success(env, client, ssoclient): CustomerSSOAccessToken.objects.get(token=d['access_token']).expires < now() +@pytest.mark.django_db +def test_token_pkce_required_if_used_in_authorization(env, client, ssoclient): + code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM") + + r = client.post('/bigevents/oauth2/v1/token', { + 'grant_type': 'authorization_code', + 'code': code, + 'redirect_uri': 'https://example.net', + }, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode()) + assert r.status_code == 400 + d = json.loads(r.content) + assert d['error'] == 'invalid_grant' + assert d['error_description'] == 'Missing of code_verifier' + + +@pytest.mark.django_db +def test_token_pkce_incorrect(env, client, ssoclient): + code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM") + + r = client.post('/bigevents/oauth2/v1/token', { + 'grant_type': 'authorization_code', + 'code': code, + 'redirect_uri': 'https://example.net', + 'code_verifier': "WRONG", + }, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode()) + assert r.status_code == 400 + d = json.loads(r.content) + assert d['error'] == 'invalid_grant' + assert d['error_description'] == 'Mismatch of code_verifier with code_challenge' + + +@pytest.mark.django_db +def test_token_success_pkce(env, client, ssoclient): + # this is the sample from the actual RFC + code_verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk" + code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM") + + r = client.post('/bigevents/oauth2/v1/token', { + 'grant_type': 'authorization_code', + 'code': code, + 'redirect_uri': 'https://example.net', + 'code_verifier': code_verifier, + }, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode()) + print(r.content) + assert r.status_code == 200 + d = json.loads(r.content) + assert d['access_token'] + + @pytest.mark.django_db def test_scope_enforcement(env, client, ssoclient): ssoclient[0].allowed_scopes = ['openid', 'profile']