Compare commits

...

3 Commits

Author SHA1 Message Date
Raphael Michel
a5beda77f8 Auto-detection 2025-02-21 12:56:42 +01:00
Raphael Michel
ef7f212063 OIDC: Implement PKCE as server 2025-02-07 18:02:43 +01:00
Raphael Michel
26a5eab47f OIDC: Implement PKCE in client implementation 2025-02-07 18:02:43 +01:00
8 changed files with 201 additions and 15 deletions

View File

@@ -148,7 +148,7 @@ def oidc_validate_and_complete_config(config):
return config
def oidc_authorize_url(provider, state, redirect_uri):
def oidc_authorize_url(provider, state, redirect_uri, pkce_code_verifier):
endpoint = provider.configuration['provider_config']['authorization_endpoint']
params = {
# https://datatracker.ietf.org/doc/html/rfc6749#section-4.1.1
@@ -163,10 +163,14 @@ def oidc_authorize_url(provider, state, redirect_uri):
if "query_parameters" in provider.configuration and provider.configuration["query_parameters"]:
params.update(parse_qsl(provider.configuration["query_parameters"]))
if pkce_code_verifier and "S256" in provider.configuration['provider_config'].get('code_challenge_methods_supported', []):
params["code_challenge"] = base64.urlsafe_b64encode(hashlib.sha256(pkce_code_verifier.encode()).digest()).decode().rstrip("=")
params["code_challenge_method"] = "S256"
return endpoint + '?' + urlencode(params)
def oidc_validate_authorization(provider, code, redirect_uri):
def oidc_validate_authorization(provider, code, redirect_uri, pkce_code_verifier):
endpoint = provider.configuration['provider_config']['token_endpoint']
# Wall of shame and RFC ignorant IDPs
@@ -188,6 +192,9 @@ def oidc_validate_authorization(provider, code, redirect_uri):
'redirect_uri': redirect_uri,
}
if pkce_code_verifier and "S256" in provider.configuration['provider_config'].get('code_challenge_methods_supported', []):
params["code_verifier"] = pkce_code_verifier
if token_endpoint_auth_method == 'client_secret_post':
params['client_id'] = provider.configuration['client_id']
params['client_secret'] = provider.configuration['client_secret']

View File

@@ -0,0 +1,28 @@
# Generated by Django 4.2.17 on 2025-02-07 16:42
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("pretixbase", "0276_item_hidden_if_item_available_mode"),
]
operations = [
migrations.AddField(
model_name="customerssoclient",
name="require_pkce",
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name="customerssogrant",
name="code_challenge",
field=models.TextField(null=True),
),
migrations.AddField(
model_name="customerssogrant",
name="code_challenge_method",
field=models.CharField(max_length=255, null=True),
),
]

View File

@@ -416,6 +416,10 @@ class CustomerSSOClient(LoggedModel):
authorization_grant_type = models.CharField(
max_length=32, choices=GRANT_TYPES, verbose_name=_("Grant type"), default=GRANT_AUTHORIZATION_CODE,
)
require_pkce = models.BooleanField(
verbose_name=_("Require PKCE extension"),
default=False,
)
redirect_uris = models.TextField(
blank=False,
verbose_name=_("Redirection URIs"),
@@ -481,6 +485,8 @@ class CustomerSSOGrant(models.Model):
expires = models.DateTimeField()
redirect_uri = models.TextField()
scope = models.TextField(blank=True)
code_challenge = models.TextField(blank=True, null=True)
code_challenge_method = models.CharField(max_length=255, blank=True, null=True)
class CustomerSSOAccessToken(models.Model):

View File

@@ -1113,7 +1113,7 @@ class SSOClientForm(I18nModelForm):
class Meta:
model = CustomerSSOClient
fields = ['is_active', 'name', 'client_id', 'client_type', 'authorization_grant_type', 'redirect_uris',
'allowed_scopes']
'allowed_scopes', 'require_pkce']
widgets = {
'authorization_grant_type': forms.RadioSelect,
'client_type': forms.RadioSelect,

View File

@@ -676,6 +676,8 @@ class SSOLoginView(RedirectBackMixin, View):
popup_origin = None
nonce = get_random_string(32)
pkce_code_verifier = get_random_string(64)
request.session[f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier'] = pkce_code_verifier
request.session[f'pretix_customerauth_{self.provider.pk}_nonce'] = nonce
request.session[f'pretix_customerauth_{self.provider.pk}_popup_origin'] = popup_origin
request.session[f'pretix_customerauth_{self.provider.pk}_cross_domain_requested'] = self.request.GET.get("request_cross_domain_customer_auth") == "true"
@@ -684,7 +686,7 @@ class SSOLoginView(RedirectBackMixin, View):
})
if self.provider.method == "oidc":
return redirect_to_url(oidc_authorize_url(self.provider, f'{nonce}%{next_url}', redirect_uri))
return redirect_to_url(oidc_authorize_url(self.provider, f'{nonce}%{next_url}', redirect_uri, pkce_code_verifier))
else:
raise Http404("Unknown SSO method.")
@@ -718,6 +720,7 @@ class SSOLoginReturnView(RedirectBackMixin, View):
)
return HttpResponseRedirect(redirect_to)
r = super().dispatch(request, *args, **kwargs)
request.session.pop(f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier', None)
request.session.pop(f'pretix_customerauth_{self.provider.pk}_nonce', None)
request.session.pop(f'pretix_customerauth_{self.provider.pk}_popup_origin', None)
request.session.pop(f'pretix_customerauth_{self.provider.pk}_cross_domain_requested', None)
@@ -763,6 +766,7 @@ class SSOLoginReturnView(RedirectBackMixin, View):
self.provider,
request.GET.get('code'),
redirect_uri,
request.session.get(f'pretix_customerauth_{self.provider.pk}_pkce_code_verifier'),
)
except ValidationError as e:
for msg in e:

View File

@@ -72,6 +72,9 @@ We currently do not implement the following optional parts of the spec:
We also implement the Discovery extension (without issuer discovery)
as per https://openid.net/specs/openid-connect-discovery-1_0.html
We also implement the PKCE extension for OAuth:
https://www.rfc-editor.org/rfc/rfc7636
The implementation passed the certification tests against the following profiles, but we did not
acquire formal certification:
@@ -136,19 +139,22 @@ class AuthorizeView(View):
self._construct_redirect_uri(redirect_uri, response_mode, qs)
)
def _require_login(self, request, client, scope, redirect_uri, response_type, response_mode, state, nonce):
def _require_login(self, request, client, scope, redirect_uri, response_type, response_mode, state, nonce,
code_challenge, code_challenge_method):
form = AuthenticationForm(data=request.POST if "login-email" in request.POST else None, request=request,
prefix="login")
if "login-email" in request.POST and form.is_valid():
customer_login(request, form.get_customer())
return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, form.get_customer())
return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce,
code_challenge, code_challenge_method, form.get_customer())
else:
return render(request, 'pretixpresale/organizers/customer_login.html', {
'providers': [],
'form': form,
})
def _success(self, client, scope, redirect_uri, response_type, response_mode, state, nonce, customer):
def _success(self, client, scope, redirect_uri, response_type, response_mode, state, nonce, code_challenge,
code_challenge_method, customer):
response_type = response_type.split(' ')
qs = {}
id_token_kwargs = {}
@@ -162,6 +168,8 @@ class AuthorizeView(View):
expires=now() + timedelta(minutes=10),
auth_time=get_customer_auth_time(self.request),
nonce=nonce,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
)
qs['code'] = grant.code
id_token_kwargs['with_code'] = grant.code
@@ -209,6 +217,8 @@ class AuthorizeView(View):
prompt = request_data.get("prompt")
response_type = request_data.get("response_type")
scope = request_data.get("scope", "").split(" ")
code_challenge = request_data.get("code_challenge")
code_challenge_method = request_data.get("code_challenge_method")
if not client_id:
return self._final_error("invalid_request", "client_id missing")
@@ -247,6 +257,16 @@ class AuthorizeView(View):
self._redirect_error("invalid_request", "id_token_hint currently not supported by this server",
redirect_uri, response_mode, state)
if code_challenge and code_challenge_method != "S256":
# "Clients re permitted to use "plain" only if they cannot support "S256" for some technical reason and
# know via out-of-band configuration that the S256 MUST be implemented, plain is not mandatory."
return self._redirect_error("invalid_request", "code_challenge transform algorithm not supported",
redirect_uri, response_mode, state)
if client.require_pkce and not code_challenge:
return self._redirect_error("invalid_request", "code_challenge (PKCE) required",
redirect_uri, response_mode, state)
has_valid_session = bool(request.customer)
if has_valid_session and max_age:
try:
@@ -261,9 +281,11 @@ class AuthorizeView(View):
has_valid_session = False
if has_valid_session:
return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, request.customer)
return self._success(client, scope, redirect_uri, response_type, response_mode, state, nonce, code_challenge,
code_challenge_method, request.customer)
else:
return self._require_login(request, client, scope, redirect_uri, response_type, response_mode, state, nonce)
return self._require_login(request, client, scope, redirect_uri, response_type, response_mode, state, nonce,
code_challenge, code_challenge_method)
class TokenView(View):
@@ -361,6 +383,24 @@ class TokenView(View):
"error_description": "Mismatch of redirect_uri"
}, status=400)
if grant.code_challenge:
if not request.POST.get("code_verifier"):
return JsonResponse({
"error": "invalid_grant",
"error_description": "Missing of code_verifier"
}, status=400)
if grant.code_challenge_method == "S256":
expected_challenge = base64.urlsafe_b64encode(hashlib.sha256(request.POST["code_verifier"].encode()).digest()).decode().rstrip("=")
print(grant.code_challenge, expected_challenge)
if expected_challenge != grant.code_challenge:
return JsonResponse({
"error": "invalid_grant",
"error_description": "Mismatch of code_verifier with code_challenge"
}, status=400)
else:
raise ValueError("Unsupported code_challenge_method in database")
with transaction.atomic():
token = self.client.access_tokens.create(
customer=grant.customer,
@@ -502,6 +542,7 @@ class ConfigurationView(View):
'token_endpoint_auth_methods_supported': [
'client_secret_post', 'client_secret_basic'
],
'code_challenge_methods_supported': ['S256'],
'claims_supported': [
'iss',
'aud',

View File

@@ -236,7 +236,8 @@ def provider(organizer):
"response_modes_supported": ["query"],
"grant_types_supported": ["authorization_code"],
"scopes_supported": ["openid", "email", "profile"],
"claims_supported": ["email", "sub"]
"claims_supported": ["email", "sub"],
"code_challenge_methods_supported": ["plain", "S256"],
}
}
)
@@ -244,6 +245,21 @@ def provider(organizer):
@pytest.mark.django_db
def test_authorize_url(provider):
assert (
"https://example.com/authorize?"
"response_type=code&"
"client_id=abc123&"
"scope=openid+email+profile&"
"state=state_val&"
"redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar&"
"code_challenge=S1ZnvzwMZHrWOO62nENdJ6jhODhf7VfyZFBIXQyrTKo&"
"code_challenge_method=S256"
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar", "pkce_value")
@pytest.mark.django_db
def test_authorize_url_no_pkce(provider):
del provider.configuration["provider_config"]["code_challenge_methods_supported"]
assert (
"https://example.com/authorize?"
"response_type=code&"
@@ -251,7 +267,7 @@ def test_authorize_url(provider):
"scope=openid+email+profile&"
"state=state_val&"
"redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar"
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar")
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar", "pkce_value")
@pytest.mark.django_db
@@ -264,7 +280,7 @@ def test_validate_authorization_invalid(provider):
status=400,
)
with pytest.raises(ValidationError):
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")
@pytest.mark.django_db
@@ -281,6 +297,7 @@ def test_validate_authorization_userinfo_invalid(provider):
"grant_type": "authorization_code",
"code": "code_received",
"redirect_uri": "https://redirect?foo=bar",
"code_verifier": "pkce_value",
})
],
)
@@ -296,7 +313,7 @@ def test_validate_authorization_userinfo_invalid(provider):
],
)
with pytest.raises(ValidationError) as e:
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")
assert 'could not fetch' in str(e.value)
@@ -314,6 +331,7 @@ def test_validate_authorization_valid(provider):
"grant_type": "authorization_code",
"code": "code_received",
"redirect_uri": "https://redirect?foo=bar",
"code_verifier": "pkce_value",
})
],
)
@@ -328,4 +346,4 @@ def test_validate_authorization_valid(provider):
matchers.header_matcher({"Authorization": "Bearer test_access_token"})
],
)
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar", "pkce_value")

View File

@@ -208,6 +208,37 @@ def test_authorize_with_prompt_none(env, client, ssoclient):
assert re.match(r'https://example.net\?code=([a-z0-9A-Z]{64})&state=STATE', r.headers['Location'])
@pytest.mark.django_db
def test_authorize_with_invalid_pkce_method(env, client, ssoclient):
url = f'/bigevents/oauth2/v1/authorize?' \
f'client_id={ssoclient[0].client_id}&' \
f'redirect_uri=https://example.net&' \
f'response_type=code&state=STATE&scope=openid+profile&' \
f'code_challenge=pkce_value&code_challenge_method=plain'
r = client.get(url)
assert r.status_code == 302
assert r.headers['Location'] == 'https://example.net?' \
'error=invalid_request&' \
'error_description=code_challenge+transform+algorithm+not+supported&' \
'state=STATE'
@pytest.mark.django_db
def test_authorize_with_missing_pkce_if_required(env, client, ssoclient):
ssoclient[0].require_pkce = True
ssoclient[0].save()
url = f'/bigevents/oauth2/v1/authorize?' \
f'client_id={ssoclient[0].client_id}&' \
f'redirect_uri=https://example.net&' \
f'response_type=code&state=STATE&scope=openid+profile'
r = client.get(url)
assert r.status_code == 302
assert r.headers['Location'] == 'https://example.net?' \
'error=invalid_request&' \
'error_description=code_challenge+%28PKCE%29+required&' \
'state=STATE'
@pytest.mark.django_db
def test_authorize_require_login_if_prompt_requires_it_or_is_expired(env, client, ssoclient):
with freeze_time("2021-04-10T11:00:00+02:00"):
@@ -286,7 +317,7 @@ def test_token_require_client_id(env, client, ssoclient):
assert b'unsupported_grant_type' in r.content
def _authorization_step(client, ssoclient):
def _authorization_step(client, ssoclient, code_challenge=None):
r = client.post('/bigevents/account/login', {
'email': 'john@example.org',
'password': 'foo',
@@ -299,6 +330,8 @@ def _authorization_step(client, ssoclient):
f'client_id={ssoclient[0].client_id}&' \
f'redirect_uri=https://example.net&' \
f'response_type=code&state=STATE&scope=openid+profile+email+phone'
if code_challenge:
url += f'&code_challenge={code_challenge}&code_challenge_method=S256'
r = client.get(url)
assert r.status_code == 302
m = re.match(r'https://example.net\?code=([a-z0-9A-Z]{64})&state=STATE', r.headers['Location'])
@@ -373,6 +406,55 @@ def test_token_success(env, client, ssoclient):
CustomerSSOAccessToken.objects.get(token=d['access_token']).expires < now()
@pytest.mark.django_db
def test_token_pkce_required_if_used_in_authorization(env, client, ssoclient):
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
r = client.post('/bigevents/oauth2/v1/token', {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': 'https://example.net',
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
assert r.status_code == 400
d = json.loads(r.content)
assert d['error'] == 'invalid_grant'
assert d['error_description'] == 'Missing of code_verifier'
@pytest.mark.django_db
def test_token_pkce_incorrect(env, client, ssoclient):
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
r = client.post('/bigevents/oauth2/v1/token', {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': 'https://example.net',
'code_verifier': "WRONG",
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
assert r.status_code == 400
d = json.loads(r.content)
assert d['error'] == 'invalid_grant'
assert d['error_description'] == 'Mismatch of code_verifier with code_challenge'
@pytest.mark.django_db
def test_token_success_pkce(env, client, ssoclient):
# this is the sample from the actual RFC
code_verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk"
code = _authorization_step(client, ssoclient, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
r = client.post('/bigevents/oauth2/v1/token', {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': 'https://example.net',
'code_verifier': code_verifier,
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(f'{ssoclient[0].client_id}:{ssoclient[1]}'.encode()).decode())
print(r.content)
assert r.status_code == 200
d = json.loads(r.content)
assert d['access_token']
@pytest.mark.django_db
def test_scope_enforcement(env, client, ssoclient):
ssoclient[0].allowed_scopes = ['openid', 'profile']