forked from CGM_Public/pretix_original
Compare commits
1 Commits
show-event
...
oauth-tool
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b40c9f44c |
77
src/pretix/api/migrations/0009_auto_20221217_1847.py
Normal file
77
src/pretix/api/migrations/0009_auto_20221217_1847.py
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
# Generated by Django 3.2.16 on 2022-12-17 18:47
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import django.db.models.deletion
|
||||||
|
import oauth2_provider.generators
|
||||||
|
import oauth2_provider.models
|
||||||
|
from django.conf import settings
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('pretixbase', '0226_itemvariationmetavalue'),
|
||||||
|
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||||
|
('pretixapi', '0008_webhookcallretry'),
|
||||||
|
]
|
||||||
|
run_before = [
|
||||||
|
('oauth2_provider', '0002_auto_20190406_1805'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='oauthapplication',
|
||||||
|
name='algorithm',
|
||||||
|
field=models.CharField(default='', max_length=5),
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='oauthgrant',
|
||||||
|
name='claims',
|
||||||
|
field=models.TextField(default=''),
|
||||||
|
preserve_default=False,
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='oauthgrant',
|
||||||
|
name='code_challenge',
|
||||||
|
field=models.CharField(default='', max_length=128),
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='oauthgrant',
|
||||||
|
name='code_challenge_method',
|
||||||
|
field=models.CharField(default='', max_length=10),
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='oauthgrant',
|
||||||
|
name='nonce',
|
||||||
|
field=models.CharField(default='', max_length=255),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='oauthapplication',
|
||||||
|
name='client_secret',
|
||||||
|
field=oauth2_provider.models.ClientSecretField(db_index=True, default=oauth2_provider.generators.generate_client_secret, max_length=255),
|
||||||
|
),
|
||||||
|
migrations.CreateModel(
|
||||||
|
name='OAuthIDToken',
|
||||||
|
fields=[
|
||||||
|
('id', models.BigAutoField(primary_key=True, serialize=False)),
|
||||||
|
('jti', models.UUIDField(default=uuid.uuid4, unique=True)),
|
||||||
|
('expires', models.DateTimeField()),
|
||||||
|
('scope', models.TextField()),
|
||||||
|
('created', models.DateTimeField(auto_now_add=True)),
|
||||||
|
('updated', models.DateTimeField(auto_now=True)),
|
||||||
|
('application', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL)),
|
||||||
|
('organizers', models.ManyToManyField(to='pretixbase.Organizer')),
|
||||||
|
('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='pretixapi_oauthidtoken', to=settings.AUTH_USER_MODEL)),
|
||||||
|
],
|
||||||
|
options={
|
||||||
|
'abstract': False,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='oauthaccesstoken',
|
||||||
|
name='id_token',
|
||||||
|
field=models.OneToOneField(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='access_token', to='pretixapi.oauthidtoken'),
|
||||||
|
),
|
||||||
|
]
|
||||||
@@ -29,8 +29,8 @@ from oauth2_provider.generators import (
|
|||||||
generate_client_id, generate_client_secret,
|
generate_client_id, generate_client_secret,
|
||||||
)
|
)
|
||||||
from oauth2_provider.models import (
|
from oauth2_provider.models import (
|
||||||
AbstractAccessToken, AbstractApplication, AbstractGrant,
|
AbstractAccessToken, AbstractApplication, AbstractGrant, AbstractIDToken,
|
||||||
AbstractRefreshToken,
|
AbstractRefreshToken, ClientSecretField,
|
||||||
)
|
)
|
||||||
from oauth2_provider.validators import URIValidator
|
from oauth2_provider.validators import URIValidator
|
||||||
|
|
||||||
@@ -46,7 +46,7 @@ class OAuthApplication(AbstractApplication):
|
|||||||
verbose_name=_("Client ID"),
|
verbose_name=_("Client ID"),
|
||||||
max_length=100, unique=True, default=generate_client_id, db_index=True
|
max_length=100, unique=True, default=generate_client_id, db_index=True
|
||||||
)
|
)
|
||||||
client_secret = models.CharField(
|
client_secret = ClientSecretField(
|
||||||
verbose_name=_("Client secret"),
|
verbose_name=_("Client secret"),
|
||||||
max_length=255, blank=False, default=generate_client_secret, db_index=True
|
max_length=255, blank=False, default=generate_client_secret, db_index=True
|
||||||
)
|
)
|
||||||
@@ -67,12 +67,26 @@ class OAuthGrant(AbstractGrant):
|
|||||||
redirect_uri = models.CharField(max_length=2500) # Only 255 in AbstractGrant, which caused problems
|
redirect_uri = models.CharField(max_length=2500) # Only 255 in AbstractGrant, which caused problems
|
||||||
|
|
||||||
|
|
||||||
|
class OAuthIDToken(AbstractIDToken):
|
||||||
|
application = models.ForeignKey(
|
||||||
|
OAuthApplication, on_delete=models.CASCADE,
|
||||||
|
)
|
||||||
|
organizers = models.ManyToManyField('pretixbase.Organizer')
|
||||||
|
|
||||||
|
|
||||||
class OAuthAccessToken(AbstractAccessToken):
|
class OAuthAccessToken(AbstractAccessToken):
|
||||||
source_refresh_token = models.OneToOneField(
|
source_refresh_token = models.OneToOneField(
|
||||||
# unique=True implied by the OneToOneField
|
# unique=True implied by the OneToOneField
|
||||||
'OAuthRefreshToken', on_delete=models.SET_NULL, blank=True, null=True,
|
'OAuthRefreshToken', on_delete=models.SET_NULL, blank=True, null=True,
|
||||||
related_name="refreshed_access_token"
|
related_name="refreshed_access_token"
|
||||||
)
|
)
|
||||||
|
id_token = models.OneToOneField(
|
||||||
|
OAuthIDToken,
|
||||||
|
on_delete=models.CASCADE,
|
||||||
|
blank=True,
|
||||||
|
null=True,
|
||||||
|
related_name="access_token",
|
||||||
|
)
|
||||||
application = models.ForeignKey(
|
application = models.ForeignKey(
|
||||||
OAuthApplication, on_delete=models.CASCADE, blank=True, null=True,
|
OAuthApplication, on_delete=models.CASCADE, blank=True, null=True,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -65,6 +65,10 @@ class OAuthApplicationRegistrationView(ApplicationRegistration):
|
|||||||
def form_valid(self, form):
|
def form_valid(self, form):
|
||||||
form.instance.client_type = 'confidential'
|
form.instance.client_type = 'confidential'
|
||||||
form.instance.authorization_grant_type = 'authorization-code'
|
form.instance.authorization_grant_type = 'authorization-code'
|
||||||
|
secret = generate_client_secret()
|
||||||
|
messages.success(self.request, _('Your application has been created and an application secret has been generated. '
|
||||||
|
'Please copy and save it right now as it will not be shown again: {secret}').format(secret=secret))
|
||||||
|
form.instance.client_secret = secret
|
||||||
oauth_application_registered.send(
|
oauth_application_registered.send(
|
||||||
sender=self.request, user=self.request.user, application=form.instance
|
sender=self.request, user=self.request.user, application=form.instance
|
||||||
)
|
)
|
||||||
@@ -74,18 +78,14 @@ class OAuthApplicationRegistrationView(ApplicationRegistration):
|
|||||||
class ApplicationUpdateForm(forms.ModelForm):
|
class ApplicationUpdateForm(forms.ModelForm):
|
||||||
class Meta:
|
class Meta:
|
||||||
model = OAuthApplication
|
model = OAuthApplication
|
||||||
fields = ("name", "client_id", "client_secret", "redirect_uris")
|
fields = ("name", "client_id", "redirect_uris")
|
||||||
|
|
||||||
def clean_client_id(self):
|
def clean_client_id(self):
|
||||||
return self.instance.client_id
|
return self.instance.client_id
|
||||||
|
|
||||||
def clean_client_secret(self):
|
|
||||||
return self.instance.client_secret
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self.fields['client_id'].widget.attrs['readonly'] = True
|
self.fields['client_id'].widget.attrs['readonly'] = True
|
||||||
self.fields['client_secret'].widget.attrs['readonly'] = True
|
|
||||||
|
|
||||||
|
|
||||||
class OAuthApplicationUpdateView(ApplicationUpdate):
|
class OAuthApplicationUpdateView(ApplicationUpdate):
|
||||||
@@ -103,8 +103,10 @@ class OAuthApplicationRollView(ApplicationDetail):
|
|||||||
|
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
self.object = self.get_object()
|
self.object = self.get_object()
|
||||||
messages.success(request, _('A new client secret has been generated and is now effective.'))
|
secret = generate_client_secret()
|
||||||
self.object.client_secret = generate_client_secret()
|
messages.success(request, _('A new client secret has been generated. Please copy and save it right now as '
|
||||||
|
'it will not be shown again: {secret}').format(secret=secret))
|
||||||
|
self.object.client_secret = secret
|
||||||
self.object.save()
|
self.object.save()
|
||||||
return HttpResponseRedirect(self.object.get_absolute_url())
|
return HttpResponseRedirect(self.object.get_absolute_url())
|
||||||
|
|
||||||
|
|||||||
@@ -861,6 +861,7 @@ AUTH_PASSWORD_VALIDATORS = [
|
|||||||
OAUTH2_PROVIDER_APPLICATION_MODEL = 'pretixapi.OAuthApplication'
|
OAUTH2_PROVIDER_APPLICATION_MODEL = 'pretixapi.OAuthApplication'
|
||||||
OAUTH2_PROVIDER_GRANT_MODEL = 'pretixapi.OAuthGrant'
|
OAUTH2_PROVIDER_GRANT_MODEL = 'pretixapi.OAuthGrant'
|
||||||
OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL = 'pretixapi.OAuthAccessToken'
|
OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL = 'pretixapi.OAuthAccessToken'
|
||||||
|
OAUTH2_PROVIDER_ID_TOKEN_MODEL = 'pretixapi.OAuthIDToken'
|
||||||
OAUTH2_PROVIDER_REFRESH_TOKEN_MODEL = 'pretixapi.OAuthRefreshToken'
|
OAUTH2_PROVIDER_REFRESH_TOKEN_MODEL = 'pretixapi.OAuthRefreshToken'
|
||||||
OAUTH2_PROVIDER = {
|
OAUTH2_PROVIDER = {
|
||||||
'SCOPES': {
|
'SCOPES': {
|
||||||
@@ -872,7 +873,8 @@ OAUTH2_PROVIDER = {
|
|||||||
'ALLOWED_REDIRECT_URI_SCHEMES': ['https'] if not DEBUG else ['http', 'https'],
|
'ALLOWED_REDIRECT_URI_SCHEMES': ['https'] if not DEBUG else ['http', 'https'],
|
||||||
'ACCESS_TOKEN_EXPIRE_SECONDS': 3600 * 24,
|
'ACCESS_TOKEN_EXPIRE_SECONDS': 3600 * 24,
|
||||||
'ROTATE_REFRESH_TOKEN': False,
|
'ROTATE_REFRESH_TOKEN': False,
|
||||||
|
'PKCE_REQUIRED': False,
|
||||||
|
'OIDC_RESPONSE_TYPES_SUPPORTED': ["code"], # We don't support proper OIDC for now
|
||||||
}
|
}
|
||||||
|
|
||||||
COUNTRIES_OVERRIDE = {
|
COUNTRIES_OVERRIDE = {
|
||||||
|
|||||||
@@ -182,7 +182,7 @@ setup(
|
|||||||
'django-localflavor==3.1',
|
'django-localflavor==3.1',
|
||||||
'django-markup',
|
'django-markup',
|
||||||
'django-mysql',
|
'django-mysql',
|
||||||
'django-oauth-toolkit==1.2.*',
|
'django-oauth-toolkit==2.2.*',
|
||||||
'django-otp==1.1.*',
|
'django-otp==1.1.*',
|
||||||
'django-phonenumber-field==7.0.*',
|
'django-phonenumber-field==7.0.*',
|
||||||
'django-redis==5.2.*',
|
'django-redis==5.2.*',
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ import json
|
|||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from django.utils.crypto import get_random_string
|
||||||
|
|
||||||
from pretix.api.models import (
|
from pretix.api.models import (
|
||||||
OAuthAccessToken, OAuthApplication, OAuthGrant, OAuthRefreshToken,
|
OAuthAccessToken, OAuthApplication, OAuthGrant, OAuthRefreshToken,
|
||||||
@@ -64,12 +65,17 @@ def admin_user(admin_team):
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def application():
|
def application():
|
||||||
return OAuthApplication.objects.create(
|
secret = get_random_string(32)
|
||||||
|
a = OAuthApplication.objects.create(
|
||||||
name="pretalx",
|
name="pretalx",
|
||||||
redirect_uris="https://pretalx.com",
|
redirect_uris="https://pretalx.com",
|
||||||
client_type='confidential',
|
client_type='confidential',
|
||||||
|
client_secret=secret,
|
||||||
authorization_grant_type='authorization-code'
|
authorization_grant_type='authorization-code'
|
||||||
)
|
)
|
||||||
|
a._cached_secret = secret
|
||||||
|
a.save()
|
||||||
|
return a
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -267,7 +273,7 @@ def test_token_from_code(client, admin_user, organizer, application: OAuthApplic
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
assert data['expires_in'] == 86400
|
assert data['expires_in'] == 86400
|
||||||
@@ -306,7 +312,7 @@ def test_use_token_for_access_one_organizer(client, admin_user, organizer, appli
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
access_token = data['access_token']
|
access_token = data['access_token']
|
||||||
@@ -348,7 +354,7 @@ def test_use_token_for_access_two_organizers(client, admin_user, organizer, appl
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
access_token = data['access_token']
|
access_token = data['access_token']
|
||||||
@@ -389,7 +395,7 @@ def test_token_refresh(client, admin_user, organizer, application: OAuthApplicat
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
refresh_token = data['refresh_token']
|
refresh_token = data['refresh_token']
|
||||||
@@ -398,7 +404,7 @@ def test_token_refresh(client, admin_user, organizer, application: OAuthApplicat
|
|||||||
'refresh_token': refresh_token,
|
'refresh_token': refresh_token,
|
||||||
'grant_type': 'refresh_token',
|
'grant_type': 'refresh_token',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
assert not OAuthAccessToken.objects.filter(token=access_token).exists() # old token revoked
|
assert not OAuthAccessToken.objects.filter(token=access_token).exists() # old token revoked
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
@@ -431,7 +437,7 @@ def test_allow_write(client, admin_user, organizer, application: OAuthApplicatio
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
access_token = data['access_token']
|
access_token = data['access_token']
|
||||||
@@ -463,7 +469,7 @@ def test_allow_read_only(client, admin_user, organizer, application: OAuthApplic
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
access_token = data['access_token']
|
access_token = data['access_token']
|
||||||
@@ -495,7 +501,7 @@ def test_token_revoke_refresh_token(client, admin_user, organizer, application:
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
refresh_token = data['refresh_token']
|
refresh_token = data['refresh_token']
|
||||||
@@ -503,7 +509,7 @@ def test_token_revoke_refresh_token(client, admin_user, organizer, application:
|
|||||||
resp = client.post('/api/v1/oauth/revoke_token', data={
|
resp = client.post('/api/v1/oauth/revoke_token', data={
|
||||||
'token': refresh_token,
|
'token': refresh_token,
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
assert not OAuthAccessToken.objects.get(token=access_token).is_valid()
|
assert not OAuthAccessToken.objects.get(token=access_token).is_valid()
|
||||||
assert not OAuthRefreshToken.objects.filter(token=refresh_token, revoked__isnull=True).exists()
|
assert not OAuthRefreshToken.objects.filter(token=refresh_token, revoked__isnull=True).exists()
|
||||||
@@ -511,7 +517,7 @@ def test_token_revoke_refresh_token(client, admin_user, organizer, application:
|
|||||||
'refresh_token': refresh_token,
|
'refresh_token': refresh_token,
|
||||||
'grant_type': 'refresh_token',
|
'grant_type': 'refresh_token',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 400
|
assert resp.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
@@ -539,7 +545,7 @@ def test_token_revoke_access_token(client, admin_user, organizer, application: O
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
refresh_token = data['refresh_token']
|
refresh_token = data['refresh_token']
|
||||||
@@ -547,7 +553,7 @@ def test_token_revoke_access_token(client, admin_user, organizer, application: O
|
|||||||
resp = client.post('/api/v1/oauth/revoke_token', data={
|
resp = client.post('/api/v1/oauth/revoke_token', data={
|
||||||
'token': access_token,
|
'token': access_token,
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
assert not OAuthAccessToken.objects.get(token=access_token).is_valid() # old token revoked
|
assert not OAuthAccessToken.objects.get(token=access_token).is_valid() # old token revoked
|
||||||
|
|
||||||
@@ -555,7 +561,7 @@ def test_token_revoke_access_token(client, admin_user, organizer, application: O
|
|||||||
'refresh_token': refresh_token,
|
'refresh_token': refresh_token,
|
||||||
'grant_type': 'refresh_token',
|
'grant_type': 'refresh_token',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
access_token = data['access_token']
|
access_token = data['access_token']
|
||||||
@@ -587,7 +593,7 @@ def test_user_revoke(client, admin_user, organizer, application: OAuthApplicatio
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
refresh_token = data['refresh_token']
|
refresh_token = data['refresh_token']
|
||||||
@@ -606,7 +612,7 @@ def test_user_revoke(client, admin_user, organizer, application: OAuthApplicatio
|
|||||||
'refresh_token': refresh_token,
|
'refresh_token': refresh_token,
|
||||||
'grant_type': 'refresh_token',
|
'grant_type': 'refresh_token',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 400
|
assert resp.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
@@ -634,7 +640,7 @@ def test_allow_profile_only(client, admin_user, organizer, application: OAuthApp
|
|||||||
'redirect_uri': application.redirect_uris,
|
'redirect_uri': application.redirect_uris,
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
}, HTTP_AUTHORIZATION='Basic ' + base64.b64encode(
|
||||||
('%s:%s' % (application.client_id, application.client_secret)).encode()).decode())
|
('%s:%s' % (application.client_id, application._cached_secret)).encode()).decode())
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
data = json.loads(resp.content.decode())
|
data = json.loads(resp.content.decode())
|
||||||
access_token = data['access_token']
|
access_token = data['access_token']
|
||||||
@@ -642,3 +648,18 @@ def test_allow_profile_only(client, admin_user, organizer, application: OAuthApp
|
|||||||
assert resp.status_code == 403
|
assert resp.status_code == 403
|
||||||
resp = client.get('/api/v1/me', HTTP_AUTHORIZATION='Bearer %s' % access_token)
|
resp = client.get('/api/v1/me', HTTP_AUTHORIZATION='Bearer %s' % access_token)
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_reject_other_response_types(client, admin_user, organizer, application: OAuthApplication):
|
||||||
|
client.login(email='dummy@dummy.dummy', password='dummy')
|
||||||
|
resp = client.get('/api/v1/oauth/authorize?client_id=%s&redirect_uri=%s&response_type=code+id_token' % (
|
||||||
|
application.client_id, quote(application.redirect_uris)
|
||||||
|
))
|
||||||
|
assert resp.status_code == 302
|
||||||
|
assert 'error=unauthorized_client' in resp['Location']
|
||||||
|
resp = client.get('/api/v1/oauth/authorize?client_id=%s&redirect_uri=%s&response_type=id_token' % (
|
||||||
|
application.client_id, quote(application.redirect_uris)
|
||||||
|
))
|
||||||
|
assert resp.status_code == 302
|
||||||
|
assert 'error=unsupported_response_type' in resp['Location']
|
||||||
|
|||||||
Reference in New Issue
Block a user