OpenID Connect RP support for customer accounts

This commit is contained in:
Raphael Michel
2022-07-11 12:45:51 +02:00
committed by Raphael Michel
parent e102a590ab
commit 7f5518dbf6
39 changed files with 1943 additions and 55 deletions

View File

@@ -140,6 +140,29 @@ def test_customer_patch(token_client, organizer, customer):
assert customer.email == 'blubb@example.org'
@pytest.mark.django_db
def test_customer_patch_with_provider(token_client, organizer, customer):
with scopes_disabled():
customer.provider = organizer.sso_providers.create(
method="oidc",
name="OIDC OP",
configuration={}
)
customer.external_identifier = "123"
customer.save()
resp = token_client.patch(
'/api/v1/organizers/{}/customers/{}/'.format(organizer.slug, customer.identifier),
format='json',
data={
'external_identifier': '234',
}
)
assert resp.status_code == 200
customer.refresh_from_db()
assert customer.external_identifier == "123"
@pytest.mark.django_db
def test_customer_anonymize(token_client, organizer, customer):
resp = token_client.post(

View File

@@ -0,0 +1,331 @@
#
# This file is part of pretix (Community Edition).
#
# Copyright (C) 2014-2020 Raphael Michel and contributors
# Copyright (C) 2020-2021 rami.io GmbH and contributors
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation in version 3 of the License.
#
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
# this file, see <https://pretix.eu/about/en/license>.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import pytest
import responses
from django.core.exceptions import ValidationError
from responses import matchers
from pretix.base.customersso.oidc import (
oidc_authorize_url, oidc_validate_and_complete_config,
oidc_validate_authorization,
)
from pretix.base.models import Organizer
from pretix.base.models.customers import CustomerSSOProvider
def test_missing_parameter():
config = {
"base_url": "https://example.com",
"client_id": "abc123",
"client_secret": "abcdefghi",
"uid_field": "sub",
}
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert '"email_field" is missing' in str(e.value)
@responses.activate
def test_autoconf_unreachable():
config = {
"base_url": "https://example.com/provider",
"client_id": "abc123",
"client_secret": "abcdefghi",
"uid_field": "sub",
"email_field": "email",
"scope": "foo bar",
}
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={"error": "not found"},
status=404
)
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "Unable to retrieve" in str(e.value)
assert "404" in str(e.value)
@responses.activate
def test_incompatible():
config = {
"base_url": "https://example.com/provider",
"client_id": "abc123",
"client_secret": "abcdefghi",
"uid_field": "sub",
"email_field": "email",
"scope": "foo bar",
}
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={},
)
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "authorization_endpoint not set" in str(e.value)
responses.reset()
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={
"authorization_endpoint": "https://example.com/authorize",
},
)
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "userinfo_endpoint not set" in str(e.value)
responses.reset()
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={
"authorization_endpoint": "https://example.com/authorize",
"userinfo_endpoint": "https://example.com/userinfo",
},
)
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "token_endpoint not set" in str(e.value)
responses.reset()
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={
"authorization_endpoint": "https://example.com/authorize",
"token_endpoint": "https://example.com/token",
"userinfo_endpoint": "https://example.com/userinfo",
},
)
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "provider supports response types" in str(e.value)
responses.reset()
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={
"authorization_endpoint": "https://example.com/authorize",
"token_endpoint": "https://example.com/token",
"userinfo_endpoint": "https://example.com/userinfo",
"response_types_supported": ["code"],
"response_modes_supported": ["bogus"],
},
)
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "provider supports response modes" in str(e.value)
responses.reset()
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={
"authorization_endpoint": "https://example.com/authorize",
"token_endpoint": "https://example.com/token",
"userinfo_endpoint": "https://example.com/userinfo",
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
"grant_types_supported": ["test"],
},
)
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "provider supports grant types" in str(e.value)
responses.reset()
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={
"authorization_endpoint": "https://example.com/authorize",
"token_endpoint": "https://example.com/token",
"userinfo_endpoint": "https://example.com/userinfo",
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
"grant_types_supported": ["authorization_code"],
},
)
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "not requesting" in str(e.value)
config["scope"] = "openid foo"
with pytest.raises(ValidationError) as e:
oidc_validate_and_complete_config(config)
assert "requesting scope" in str(e.value)
@responses.activate
def test_compatible():
config = {
"base_url": "https://example.com/provider",
"client_id": "abc123",
"client_secret": "abcdefghi",
"uid_field": "sub",
"email_field": "email",
"scope": "openid email profile",
}
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json={
"authorization_endpoint": "https://example.com/authorize",
"token_endpoint": "https://example.com/token",
"userinfo_endpoint": "https://example.com/userinfo",
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
"grant_types_supported": ["authorization_code"],
"scopes_supported": ["openid", "email", "profile"],
"claims_supported": ["email", "sub"]
},
)
config = oidc_validate_and_complete_config(config)
assert config["provider_config"]["token_endpoint"] == "https://example.com/token"
@pytest.fixture
def organizer():
return Organizer.objects.create(name="Dummy", slug="dummy")
@pytest.fixture
def provider(organizer):
return CustomerSSOProvider.objects.create(
organizer=organizer,
method="oidc",
name="OIDC OP",
configuration={
"base_url": "https://example.com/provider",
"client_id": "abc123",
"client_secret": "abcdefghi",
"uid_field": "sub",
"email_field": "email",
"scope": "openid email profile",
"provider_config": {
"authorization_endpoint": "https://example.com/authorize",
"token_endpoint": "https://example.com/token",
"userinfo_endpoint": "https://example.com/userinfo",
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
"grant_types_supported": ["authorization_code"],
"scopes_supported": ["openid", "email", "profile"],
"claims_supported": ["email", "sub"]
}
}
)
@pytest.mark.django_db
def test_authorize_url(provider):
assert (
"https://example.com/authorize?"
"response_type=code&"
"client_id=abc123&"
"scope=openid+email+profile&"
"state=state_val&"
"redirect_uri=https%3A%2F%2Fredirect%3Ffoo%3Dbar"
) == oidc_authorize_url(provider, "state_val", "https://redirect?foo=bar")
@pytest.mark.django_db
@responses.activate
def test_validate_authorization_invalid(provider):
responses.add(
responses.POST,
"https://example.com/token",
json={},
status=400,
)
with pytest.raises(ValidationError):
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
@pytest.mark.django_db
@responses.activate
def test_validate_authorization_userinfo_invalid(provider):
responses.add(
responses.POST,
"https://example.com/token",
json={
'access_token': 'test_access_token',
},
match=[
matchers.urlencoded_params_matcher({
"grant_type": "authorization_code",
"code": "code_received",
"redirect_uri": "https://redirect?foo=bar",
})
],
)
responses.add(
responses.GET,
"https://example.com/userinfo",
json={
'uid': 'abcdf',
'email': 'test@example.org'
},
match=[
matchers.header_matcher({"Authorization": "Bearer test_access_token"})
],
)
with pytest.raises(ValidationError) as e:
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")
assert 'could not fetch' in str(e.value)
@pytest.mark.django_db
@responses.activate
def test_validate_authorization_valid(provider):
responses.add(
responses.POST,
"https://example.com/token",
json={
'access_token': 'test_access_token',
},
match=[
matchers.urlencoded_params_matcher({
"grant_type": "authorization_code",
"code": "code_received",
"redirect_uri": "https://redirect?foo=bar",
})
],
)
responses.add(
responses.GET,
"https://example.com/userinfo",
json={
'sub': 'abcdf',
'email': 'test@example.org'
},
match=[
matchers.header_matcher({"Authorization": "Bearer test_access_token"})
],
)
oidc_validate_authorization(provider, "code_received", "https://redirect?foo=bar")

View File

@@ -31,6 +31,7 @@ from tests.base import extract_form_fields
from pretix.base.models import (
Item, Order, OrderPosition, Organizer, Team, User,
)
from pretix.base.models.customers import CustomerSSOProvider
@pytest.fixture
@@ -90,6 +91,16 @@ def admin_user(organizer):
return u
@pytest.fixture
def provider(organizer):
return CustomerSSOProvider.objects.create(
organizer=organizer,
method="oidc",
name="OIDC OP",
configuration={}
)
@pytest.mark.django_db
def test_list_of_customers(organizer, admin_user, client, customer):
client.login(email='dummy@dummy.dummy', password='dummy')
@@ -125,6 +136,25 @@ def test_customer_update(organizer, admin_user, customer, client):
assert customer.is_verified
@pytest.mark.django_db
def test_customer_update_email_not_allowed_for_sso_customers(organizer, admin_user, customer, client, provider):
customer.provider = provider
customer.save()
client.login(email='dummy@dummy.dummy', password='dummy')
resp = client.get('/control/organizer/dummy/customer/{}/edit'.format(customer.identifier))
doc = BeautifulSoup(resp.content, "lxml")
d = extract_form_fields(doc)
d['name_parts_0'] = 'John Doe'
d['email'] = 'customer@example.net'
d['external_identifier'] = 'aaaaaaa'
resp = client.post('/control/organizer/dummy/customer/{}/edit'.format(customer.identifier), d)
assert resp.status_code == 302
customer.refresh_from_db()
assert customer.name == 'John Doe'
assert customer.email == "john@example.org"
assert not customer.external_identifier
@pytest.mark.django_db
def test_customer_anonymize(organizer, admin_user, customer, client, order):
customer.is_active = True

View File

@@ -23,6 +23,7 @@ import datetime
from smtplib import SMTPResponseException
import pytest
import responses
from django.db import transaction
from django.test.utils import override_settings
from django_scopes import scopes_disabled
@@ -292,3 +293,41 @@ class OrganizerTest(SoupTest):
self.orga1.settings.flush()
assert "smtp_use_custom" not in self.orga1.settings._cache()
assert "mail_from" not in self.orga1.settings._cache()
@responses.activate
def test_create_sso_provider(self):
conf = {
"authorization_endpoint": "https://example.com/authorize",
"token_endpoint": "https://example.com/token",
"userinfo_endpoint": "https://example.com/userinfo",
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
"grant_types_supported": ["authorization_code"],
"scopes_supported": ["openid", "email", "profile"],
"claims_supported": ["email", "sub"]
}
responses.add(
responses.GET,
"https://example.com/provider/.well-known/openid-configuration",
json=conf
)
doc = self.post_doc(
'/control/organizer/%s/ssoprovider/add' % self.orga1.slug,
{
'name_0': 'OIDC',
'button_label_0': 'Log in with OIDC',
'method': 'oidc',
'config_oidc_base_url': 'https://example.com/provider',
'config_oidc_client_id': 'aaaa',
'config_oidc_client_secret': 'bbbb',
'config_oidc_scope': 'openid email',
'config_oidc_email_field': 'email',
'config_oidc_uid_field': 'sub',
},
follow=True
)
assert not doc.select('.has-error, .alert-danger')
with scopes_disabled():
p = self.orga1.sso_providers.get()
assert p.configuration['scope'] == 'openid email'
assert p.configuration['provider_config'] == conf

View File

@@ -204,6 +204,10 @@ organizer_urls = [
'organizer/abc/webhook/add',
'organizer/abc/webhook/1/edit',
'organizer/abc/webhook/1/logs',
'organizer/abc/ssoproviders',
'organizer/abc/ssoprovider/add',
'organizer/abc/ssoprovider/1/edit',
'organizer/abc/ssoprovider/1/delete',
'organizer/abc/customers',
'organizer/abc/customer/add',
'organizer/abc/customer/1/',
@@ -523,6 +527,10 @@ organizer_permission_urls = [
("can_change_organizer_settings", "organizer/dummy/membershiptype/add", 200),
("can_change_organizer_settings", "organizer/dummy/membershiptype/1/edit", 404),
("can_change_organizer_settings", "organizer/dummy/membershiptype/1/delete", 404),
("can_change_organizer_settings", "organizer/dummy/ssoproviders", 200),
("can_change_organizer_settings", "organizer/dummy/ssoprovider/add", 200),
("can_change_organizer_settings", "organizer/dummy/ssoprovider/1/edit", 404),
("can_change_organizer_settings", "organizer/dummy/ssoprovider/1/delete", 404),
("can_manage_customers", "organizer/dummy/customers", 200),
("can_manage_customers", "organizer/dummy/customer/ABC/edit", 404),
("can_manage_customers", "organizer/dummy/customer/ABC/anonymize", 404),

View File

@@ -31,6 +31,7 @@ from bs4 import BeautifulSoup
from django.conf import settings
from django.core import mail as djmail
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.signing import dumps
from django.test import TestCase
from django.utils.crypto import get_random_string
from django.utils.timezone import now
@@ -43,6 +44,7 @@ from pretix.base.models import (
Order, OrderPayment, OrderPosition, Organizer, Question, QuestionAnswer,
Quota, SeatingPlan, Voucher,
)
from pretix.base.models.customers import CustomerSSOProvider
from pretix.base.models.items import (
ItemAddOn, ItemBundle, ItemVariation, SubEventItem, SubEventItemVariation,
)
@@ -4238,6 +4240,44 @@ class CustomerCheckoutTestCase(BaseCheckoutTestCase, TestCase):
}, follow=False)
assert response.status_code == 200
def test_native_auth_disabled(self):
self.orga.settings.customer_accounts_native = False
response = self.client.get('/%s/%s/checkout/customer/' % (self.orga.slug, self.event.slug))
assert b'register-email' not in response.content
assert b'login-email' not in response.content
response = self.client.post('/%s/%s/checkout/customer/' % (self.orga.slug, self.event.slug), {
'customer_mode': 'register',
'register-email': 'foo@example.com',
'register-name_parts_0': 'John Doe',
}, follow=False)
assert response.status_code == 200
response = self.client.post('/%s/%s/checkout/customer/' % (self.orga.slug, self.event.slug), {
'customer_mode': 'login',
'login-email': 'john@example.org',
'login-password': 'foo',
}, follow=False)
assert response.status_code == 200
def test_sso_login(self):
with scopes_disabled():
self.customer.provider = CustomerSSOProvider.objects.create(
organizer=self.orga,
method="oidc",
name="OIDC OP",
configuration={}
)
self.customer.save()
response = self.client.post('/%s/%s/checkout/customer/' % (self.orga.slug, self.event.slug), {
'customer_mode': 'login',
'login-sso-data': dumps({'customer': self.customer.pk}, salt=f'customer_sso_popup_{self.orga.pk}'),
'login-password': 'foo',
}, follow=False)
assert response.status_code == 302
self.assertRedirects(response, '/%s/%s/checkout/questions/' % (self.orga.slug, self.event.slug),
target_status_code=200)
def test_select_membership(self):
mtype = self.orga.membership_types.create(name='Week pass', transferable=False)
mtype2 = self.orga.membership_types.create(name='Invalid pass')

View File

@@ -22,16 +22,20 @@
import datetime
from datetime import timedelta
from decimal import Decimal
from urllib.parse import parse_qs, urlparse
from urllib.parse import parse_qs, quote, urlparse
import pytest
import responses
from django.core import mail as djmail, signing
from django.core.signing import dumps
from django.test import Client
from django.utils.timezone import now
from django_scopes import scopes_disabled
from pretix.base.models import Event, Item, Order, OrderPosition, Organizer
from pretix.base.models import (
Customer, Event, Item, Order, OrderPosition, Organizer,
)
from pretix.base.models.customers import CustomerSSOProvider
from pretix.multidomain.models import KnownDomain
from pretix.presale.forms.customer import TokenGenerator
@@ -69,6 +73,27 @@ def test_disabled(env, client):
assert r.status_code == 404
@pytest.mark.django_db
def test_native_disabled(env, client):
env[0].settings.customer_accounts_native = False
r = client.get('/bigevents/account/register')
assert r.status_code == 404
r = client.get('/bigevents/account/login')
assert r.status_code == 200
r = client.get('/bigevents/account/pwreset')
assert r.status_code == 404
r = client.get('/bigevents/account/pwrecover')
assert r.status_code == 404
r = client.get('/bigevents/account/activate')
assert r.status_code == 404
r = client.get('/bigevents/account/change')
assert r.status_code == 302
r = client.get('/bigevents/account/confirmchange')
assert r.status_code == 302
r = client.get('/bigevents/account/')
assert r.status_code == 302
@pytest.mark.django_db
def test_org_register(env, client):
signer = signing.TimestampSigner(salt='customer-registration-captcha-127.0.0.1')
@@ -209,6 +234,162 @@ def test_org_login_not_active(env, client):
assert b'alert-danger' in r.content
@pytest.fixture
def provider(env):
return CustomerSSOProvider.objects.create(
organizer=env[0],
method="oidc",
name="OIDC OP",
configuration={
"base_url": "https://example.com/provider",
"client_id": "abc123",
"client_secret": "abcdefghi",
"uid_field": "sub",
"email_field": "email",
"scope": "openid email profile",
"provider_config": {
"authorization_endpoint": "https://example.com/authorize",
"token_endpoint": "https://example.com/token",
"userinfo_endpoint": "https://example.com/userinfo",
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
"grant_types_supported": ["authorization_code"],
"scopes_supported": ["openid", "email", "profile"],
"claims_supported": ["email", "sub"]
}
}
)
@responses.activate
def _sso_login(client, provider, email='test@example.org', popup_origin=None, expect_fail=False):
responses.reset()
responses.add(
responses.POST,
"https://example.com/token",
json={
'access_token': 'test_access_token',
},
)
responses.add(
responses.GET,
"https://example.com/userinfo",
json={
'sub': 'abcdf',
'email': email
},
)
url = f'/bigevents/account/login/{provider.pk}/?next=/redirect'
if popup_origin:
url += '&popup_origin=' + popup_origin
r = client.get(url, follow=False)
assert r.status_code == 302
assert "/authorize" in r['Location']
u = urlparse(r['Location'])
state = parse_qs(u.query)['state'][0]
r = client.get(f'/bigevents/account/login/{provider.pk}/return?code=test_code&state={quote(state)}')
if not expect_fail:
if popup_origin:
assert r.status_code == 200
assert popup_origin in r.content.decode()
else:
assert r.status_code == 302
assert "/redirect" in r['Location']
else:
if popup_origin:
assert r.status_code == 200
assert popup_origin in r.content.decode()
else:
assert r.status_code == 302
assert "/account/login" in r['Location']
r = client.get('/bigevents/account/')
assert r.status_code == 302
@pytest.mark.django_db
def test_org_sso_login_new_customer(env, client, provider):
_sso_login(client, provider)
with scopes_disabled():
c = Customer.objects.get(provider=provider)
assert c.external_identifier == "abcdf"
r = client.get('/bigevents/account/')
assert r.status_code == 200
@pytest.mark.django_db
def test_org_sso_logout_if_provider_disabled(env, client, provider):
_sso_login(client, provider)
with scopes_disabled():
c = Customer.objects.get(provider=provider)
assert c.external_identifier == "abcdf"
r = client.get('/bigevents/account/')
assert r.status_code == 200
provider.is_active = False
provider.save()
r = client.get('/bigevents/account/')
assert r.status_code == 302
@pytest.mark.django_db
def test_org_sso_login_new_customer_popup(env, client, provider):
KnownDomain.objects.create(organizer=env[0], event=env[1], domainname="popuporigin")
_sso_login(client, provider, popup_origin="https://popuporigin")
@pytest.mark.django_db
def test_org_sso_login_new_customer_popup_invalid_origin(env, client, provider):
KnownDomain.objects.create(organizer=env[0], event=env[1], domainname="popuporigin")
with pytest.raises(AssertionError):
_sso_login(client, provider, popup_origin="https://forbidden")
@pytest.mark.django_db
def test_org_sso_login_returning_customer_new_email(env, client, provider):
_sso_login(client, provider)
with scopes_disabled():
c = Customer.objects.get(provider=provider)
r = client.get('/bigevents/account/logout')
assert r.status_code == 302
_sso_login(client, provider, 'new@example.net')
c.refresh_from_db()
assert c.email == "new@example.net"
@pytest.mark.django_db(transaction=True)
def test_org_sso_login_returning_customer_new_email_conflict(env, client, provider):
with scopes_disabled():
customer = env[0].customers.create(email='new@example.net', is_verified=True, is_active=False)
customer.set_password('foo')
customer.save()
_sso_login(client, provider)
r = client.get('/bigevents/account/logout')
assert r.status_code == 302
_sso_login(client, provider, 'new@example.net', expect_fail=True)
@pytest.mark.django_db(transaction=True)
def test_org_sso_login_new_customer_email_conflict(env, client, provider):
with scopes_disabled():
customer = env[0].customers.create(email='new@example.net', is_verified=True, is_active=False)
customer.set_password('foo')
customer.save()
_sso_login(client, provider, 'new@example.net', expect_fail=True)
@pytest.mark.django_db
@pytest.mark.parametrize("url", [
"account/change",
@@ -308,6 +489,20 @@ def test_org_order_list(env, client):
assert o3.code in content
@pytest.mark.django_db
def test_no_login_for_sso_accounts_even_if_password_is_set(env, client, provider):
with scopes_disabled():
customer = env[0].customers.create(email='john@example.org', is_verified=True, provider=provider)
customer.set_password('foo')
customer.save()
r = client.post('/bigevents/account/login', {
'email': 'john@example.org',
'password': 'foo',
})
assert r.status_code == 200
@pytest.mark.django_db
def test_change_name(env, client):
with scopes_disabled():
@@ -330,6 +525,24 @@ def test_change_name(env, client):
assert customer.name == 'John Doe'
@pytest.mark.django_db
def test_no_change_email_or_pass_for_sso_customers(env, client, provider):
_sso_login(client, provider, 'john@example.org')
r = client.post('/bigevents/account/change', {
'name_parts_0': 'Johnny',
'email': 'john@example.com',
})
assert r.status_code == 302
with scopes_disabled():
customer = Customer.objects.get(provider=provider)
customer.refresh_from_db()
assert customer.email == 'john@example.org'
assert customer.name == 'Johnny'
assert len(djmail.outbox) == 0
r = client.get('/bigevents/account/password')
assert r.status_code == 404
@pytest.mark.django_db
def test_change_email(env, client):
with scopes_disabled():
@@ -567,3 +780,59 @@ def test_cross_domain_login_validate_redirect_url(env, client, client2):
assert u.path == '/account/'
q = parse_qs(u.query)
assert 'cross_domain_customer_auth' not in q
@pytest.mark.django_db
@responses.activate
def test_cross_domain_login_with_sso(env, client, client2, provider):
with scopes_disabled():
KnownDomain.objects.create(domainname='org.test', organizer=env[0])
KnownDomain.objects.create(domainname='event.test', organizer=env[0], event=env[1])
# Log in on org domain
responses.reset()
responses.add(
responses.POST,
"https://example.com/token",
json={
'access_token': 'test_access_token',
},
)
responses.add(
responses.GET,
"https://example.com/userinfo",
json={
'sub': 'abcdf',
'email': 'john@example.org'
},
)
url = f'/account/login/{provider.pk}/?next=https://event.test/redeem&request_cross_domain_customer_auth=true'
r = client.get(url, follow=False, HTTP_HOST='org.test')
assert r.status_code == 302
assert "/authorize" in r['Location']
u = urlparse(r['Location'])
state = parse_qs(u.query)['state'][0]
r = client.get(f'/account/login/{provider.pk}/return?code=test_code&state={quote(state)}', HTTP_HOST='org.test')
assert r.status_code == 302
u = urlparse(r.headers['Location'])
assert u.netloc == 'event.test'
assert u.path == '/redeem'
q = parse_qs(u.query)
assert 'cross_domain_customer_auth' in q
# Take session over to event domain
r = client2.get(f'/?{u.query}', HTTP_HOST='event.test')
assert r.status_code == 200
assert b'john@example.org' in r.content
# Logged in on org domain
r = client.get('/', HTTP_HOST='event.test')
assert r.status_code == 200
assert b'john@example.org' in r.content
# Logged in on event domain
r = client2.get('/', HTTP_HOST='org.test')
assert r.status_code == 200
assert b'john@example.org' in r.content