API: Add RPC-style check-in endpoints to support multi-event scan (#2719)

This commit is contained in:
Raphael Michel
2022-07-19 16:43:03 +02:00
committed by GitHub
parent 748ea38e15
commit 0d1ebf4e58
17 changed files with 2012 additions and 550 deletions

View File

@@ -19,4 +19,4 @@
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
__version__ = "4.12.0.dev0"
__version__ = "4.12.0.dev1"

View File

@@ -68,6 +68,8 @@ class PretixScanSecurityProfile(AllowListSecurityProfile):
('GET', 'api-v1:orderposition-pdf_image'),
('GET', 'api-v1:event.settings'),
('POST', 'api-v1:upload'),
('POST', 'api-v1:checkinrpc.redeem'),
('GET', 'api-v1:checkinrpc.search'),
)
@@ -98,6 +100,8 @@ class PretixScanNoSyncNoSearchSecurityProfile(AllowListSecurityProfile):
('GET', 'api-v1:orderposition-pdf_image'),
('GET', 'api-v1:event.settings'),
('POST', 'api-v1:upload'),
('POST', 'api-v1:checkinrpc.redeem'),
('GET', 'api-v1:checkinrpc.search'),
)
@@ -129,6 +133,8 @@ class PretixScanNoSyncSecurityProfile(AllowListSecurityProfile):
('GET', 'api-v1:orderposition-pdf_image'),
('GET', 'api-v1:event.settings'),
('POST', 'api-v1:upload'),
('POST', 'api-v1:checkinrpc.redeem'),
('GET', 'api-v1:checkinrpc.search'),
)
@@ -194,6 +200,8 @@ class PretixPosSecurityProfile(AllowListSecurityProfile):
('GET', 'plugins:pretix_seating:event.plan'),
('GET', 'plugins:pretix_seating:selection.simple'),
('POST', 'api-v1:upload'),
('POST', 'api-v1:checkinrpc.redeem'),
('GET', 'api-v1:checkinrpc.search'),
)

View File

@@ -26,7 +26,7 @@ from rest_framework.exceptions import ValidationError
from pretix.api.serializers.event import SubEventSerializer
from pretix.api.serializers.i18n import I18nAwareModelSerializer
from pretix.base.channels import get_all_sales_channels
from pretix.base.models import CheckinList
from pretix.base.models import Checkin, CheckinList
class CheckinListSerializer(I18nAwareModelSerializer):
@@ -78,3 +78,31 @@ class CheckinListSerializer(I18nAwareModelSerializer):
CheckinList.validate_rules(data.get('rules'))
return data
class CheckinRPCRedeemInputSerializer(serializers.Serializer):
lists = serializers.PrimaryKeyRelatedField(required=True, many=True, queryset=CheckinList.objects.none())
secret = serializers.CharField(required=True, allow_null=False)
force = serializers.BooleanField(default=False, required=False)
type = serializers.ChoiceField(choices=Checkin.CHECKIN_TYPES, default=Checkin.TYPE_ENTRY)
ignore_unpaid = serializers.BooleanField(default=False, required=False)
questions_supported = serializers.BooleanField(default=True, required=False)
nonce = serializers.CharField(required=False, allow_null=True)
datetime = serializers.DateTimeField(required=False, allow_null=True)
answers = serializers.JSONField(required=False, allow_null=True)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['lists'].child_relation.queryset = CheckinList.objects.filter(event__in=self.context['events']).select_related('event')
class MiniCheckinListSerializer(I18nAwareModelSerializer):
event = serializers.SlugRelatedField(slug_field='slug', read_only=True)
subevent = serializers.PrimaryKeyRelatedField(read_only=True)
class Meta:
model = CheckinList
fields = ('id', 'name', 'event', 'subevent', 'include_pending')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View File

@@ -184,8 +184,9 @@ class ItemSerializer(I18nAwareModelSerializer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['require_membership_types'].queryset = self.context['event'].organizer.membership_types.all()
self.fields['grant_membership_type'].queryset = self.context['event'].organizer.membership_types.all()
if not self.read_only:
self.fields['require_membership_types'].queryset = self.context['event'].organizer.membership_types.all()
self.fields['grant_membership_type'].queryset = self.context['event'].organizer.membership_types.all()
def validate(self, data):
data = super().validate(data)

View File

@@ -341,10 +341,10 @@ class PdfDataSerializer(serializers.Field):
# we serialize a list.
if 'vars' not in self.context:
self.context['vars'] = get_variables(self.context['request'].event)
self.context['vars'] = get_variables(self.context['event'])
if 'vars_images' not in self.context:
self.context['vars_images'] = get_images(self.context['request'].event)
self.context['vars_images'] = get_images(self.context['event'])
for k, f in self.context['vars'].items():
try:
@@ -422,7 +422,14 @@ class OrderPositionSerializer(I18nAwareModelSerializer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
request = self.context.get('request')
if request and (not request.query_params.get('pdf_data', 'false') == 'true' or 'can_view_orders' not in request.eventpermset):
pdf_data_allowed = (
# We check this based on permission if we are on /events/…/orders/ or /events/…/orderpositions/ or
# /events/…/checkinlists/…/positions/
# We're unable to check this on this level if we're on /checkinrpc/, in which case we rely on the view
# layer to not set pdf_data=true in the first place.
request and hasattr(request, 'event') and 'can_view_orders' not in request.eventpermset
)
if not self.context.get('pdf_data') or pdf_data_allowed:
self.fields.pop('pdf_data', None)
def validate(self, data):
@@ -481,13 +488,13 @@ class CheckinListOrderPositionSerializer(OrderPositionSerializer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if 'subevent' in self.context['request'].query_params.getlist('expand'):
if 'subevent' in self.context['expand']:
self.fields['subevent'] = SubEventSerializer(read_only=True)
if 'item' in self.context['request'].query_params.getlist('expand'):
if 'item' in self.context['expand']:
self.fields['item'] = ItemSerializer(read_only=True, context=self.context)
if 'variation' in self.context['request'].query_params.getlist('expand'):
if 'variation' in self.context['expand']:
self.fields['variation'] = InlineItemVariationSerializer(read_only=True)
@@ -590,10 +597,10 @@ class OrderSerializer(I18nAwareModelSerializer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.context['request'].query_params.get('pdf_data', 'false') == 'true':
if not self.context['pdf_data']:
self.fields['positions'].child.fields.pop('pdf_data', None)
for exclude_field in self.context['request'].query_params.getlist('exclude'):
for exclude_field in self.context['exclude']:
p = exclude_field.split('.')
if p[0] in self.fields:
if len(p) == 1:

View File

@@ -112,6 +112,10 @@ for app in apps.get_app_configs():
urlpatterns = [
re_path(r'^', include(router.urls)),
re_path(r'^organizers/(?P<organizer>[^/]+)/', include(orga_router.urls)),
re_path(r'^organizers/(?P<organizer>[^/]+)/checkinrpc/redeem/$', checkin.CheckinRPCRedeemView.as_view(),
name="checkinrpc.redeem"),
re_path(r'^organizers/(?P<organizer>[^/]+)/checkinrpc/search/$', checkin.CheckinRPCSearchView.as_view(),
name="checkinrpc.search"),
re_path(r'^organizers/(?P<organizer>[^/]+)/settings/$', organizer.OrganizerSettingsView.as_view(),
name="organizer.settings"),
re_path(r'^organizers/(?P<organizer>[^/]+)/giftcards/(?P<giftcard>[^/]+)/', include(giftcard_router.urls)),

View File

@@ -19,9 +19,12 @@
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import operator
from functools import reduce
import django_filters
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.exceptions import ValidationError as BaseValidationError
from django.db import transaction
from django.db.models import (
Count, Exists, F, Max, OrderBy, OuterRef, Prefetch, Q, Subquery,
@@ -35,13 +38,18 @@ from django.utils.timezone import now
from django_filters.rest_framework import DjangoFilterBackend, FilterSet
from django_scopes import scopes_disabled
from packaging.version import parse
from rest_framework import viewsets
from rest_framework import views, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import PermissionDenied, ValidationError
from rest_framework.fields import DateTimeField
from rest_framework.generics import ListAPIView
from rest_framework.permissions import SAFE_METHODS
from rest_framework.response import Response
from pretix.api.serializers.checkin import CheckinListSerializer
from pretix.api.serializers.checkin import (
CheckinListSerializer, CheckinRPCRedeemInputSerializer,
MiniCheckinListSerializer,
)
from pretix.api.serializers.item import QuestionSerializer
from pretix.api.serializers.order import (
CheckinListOrderPositionSerializer, FailedCheckinSerializer,
@@ -51,7 +59,7 @@ from pretix.api.views.order import OrderPositionFilter
from pretix.base.i18n import language
from pretix.base.models import (
CachedFile, Checkin, CheckinList, Device, Event, Order, OrderPosition,
Question,
Question, RevokedTicketSecret, TeamAPIToken,
)
from pretix.base.services.checkin import (
CheckInError, RequiredQuestionsError, SQLLogic, perform_checkin,
@@ -266,6 +274,399 @@ with scopes_disabled():
return queryset.filter(SQLLogic(self.checkinlist).apply(self.checkinlist.rules))
def _handle_file_upload(data, user, auth):
try:
cf = CachedFile.objects.get(
session_key=f'api-upload-{str(type(user or auth))}-{(user or auth).pk}',
file__isnull=False,
pk=data[len("file:"):],
)
except (ValidationError, BaseValidationError, IndexError): # invalid uuid
raise ValidationError('The submitted file ID "{fid}" was not found.'.format(fid=data))
except CachedFile.DoesNotExist:
raise ValidationError('The submitted file ID "{fid}" was not found.'.format(fid=data))
allowed_types = (
'image/png', 'image/jpeg', 'image/gif', 'application/pdf'
)
if cf.type not in allowed_types:
raise ValidationError('The submitted file "{fid}" has a file type that is not allowed in this field.'.format(fid=data))
if cf.file.size > settings.FILE_UPLOAD_MAX_SIZE_OTHER:
raise ValidationError('The submitted file "{fid}" is too large to be used in this field.'.format(fid=data))
return cf.file
def _checkin_list_position_queryset(checkinlists, ignore_status=False, ignore_products=False, pdf_data=False, expand=None):
list_by_event = {cl.event_id: cl for cl in checkinlists}
if not checkinlists:
raise ValidationError('No check-in list passed.')
if len(list_by_event) != len(checkinlists):
raise ValidationError('Selecting two check-in lists from the same event is unsupported.')
cqs = Checkin.objects.filter(
position_id=OuterRef('pk'),
list_id__in=[cl.pk for cl in checkinlists]
).order_by().values('position_id').annotate(
m=Max('datetime')
).values('m')
qs = OrderPosition.objects.filter(
order__event__in=list_by_event.keys(),
).annotate(
last_checked_in=Subquery(cqs)
).prefetch_related('order__event', 'order__event__organizer')
lists_qs = []
for checkinlist in checkinlists:
list_q = Q(order__event_id=checkinlist.event_id)
if checkinlist.subevent:
list_q &= Q(subevent=checkinlist.subevent)
if not ignore_status:
list_q &= Q(order__status__in=[Order.STATUS_PAID, Order.STATUS_PENDING] if checkinlist.include_pending else [Order.STATUS_PAID])
if not checkinlist.all_products and not ignore_products:
list_q &= Q(item__in=checkinlist.limit_products.values_list('id', flat=True))
lists_qs.append(list_q)
qs = qs.filter(reduce(operator.or_, lists_qs))
if pdf_data:
qs = qs.prefetch_related(
Prefetch(
lookup='checkins',
queryset=Checkin.objects.filter(list_id__in=[cl.pk for cl in checkinlists])
),
'answers', 'answers__options', 'answers__question',
Prefetch('addons', OrderPosition.objects.select_related('item', 'variation')),
Prefetch('order', Order.objects.select_related('invoice_address').prefetch_related(
Prefetch(
'event',
Event.objects.select_related('organizer')
),
Prefetch(
'positions',
OrderPosition.objects.prefetch_related(
Prefetch('checkins', queryset=Checkin.objects.all()),
'item', 'variation', 'answers', 'answers__options', 'answers__question',
)
)
))
).select_related(
'item', 'variation', 'item__category', 'addon_to', 'order', 'order__invoice_address', 'seat'
)
else:
qs = qs.prefetch_related(
Prefetch(
lookup='checkins',
queryset=Checkin.objects.filter(list_id__in=[cl.pk for cl in checkinlists])
),
'answers', 'answers__options', 'answers__question',
Prefetch('addons', OrderPosition.objects.select_related('item', 'variation'))
).select_related('item', 'variation', 'order', 'addon_to', 'order__invoice_address', 'order', 'seat')
if expand and 'subevent' in expand:
qs = qs.prefetch_related(
'subevent', 'subevent__event', 'subevent__subeventitem_set', 'subevent__subeventitemvariation_set',
'subevent__seat_category_mappings', 'subevent__meta_values'
)
if expand and 'item' in expand:
qs = qs.prefetch_related('item', 'item__addons', 'item__bundles', 'item__meta_values',
'item__variations').select_related('item__tax_rule')
if expand and 'variation' in expand:
qs = qs.prefetch_related('variation')
return qs
def _redeem_process(*, checkinlists, raw_barcode, answers_data, datetime, force, checkin_type, ignore_unpaid, nonce,
untrusted_input, user, auth, expand, pdf_data, request, questions_supported, canceled_supported,
legacy_url_support=False):
if not checkinlists:
raise ValidationError('No check-in list passed.')
list_by_event = {cl.event_id: cl for cl in checkinlists}
prefetch_related_objects([cl for cl in checkinlists if not cl.all_products], 'limit_products')
device = auth if isinstance(auth, Device) else None
gate = auth.gate if isinstance(auth, Device) else None
context = {
'request': request,
'expand': expand,
}
def _make_context(context, event):
return {
**context,
'event': op.order.event,
'pdf_data': pdf_data and (
user if user and user.is_authenticated else auth
).has_event_permission(request.organizer, event, 'can_view_orders', request),
}
common_checkin_args = dict(
raw_barcode=raw_barcode,
type=checkin_type,
list=checkinlists[0],
datetime=datetime,
device=device,
gate=gate,
nonce=nonce,
forced=force,
)
raw_barcode_for_checkin = None
from_revoked_secret = False
# 1. Gather a list of positions that could be the one we looking fore, either from their ID, secret or
# parent secret
queryset = _checkin_list_position_queryset(checkinlists, pdf_data=pdf_data, ignore_status=True, ignore_products=True).order_by(
F('addon_to').asc(nulls_first=True)
)
q = Q(secret=raw_barcode)
if any(cl.addon_match for cl in checkinlists):
q |= Q(addon_to__secret=raw_barcode)
if raw_barcode.isnumeric() and not untrusted_input and legacy_url_support:
q |= Q(pk=raw_barcode)
op_candidates = list(queryset.filter(q))
if not op_candidates and '+' in raw_barcode and legacy_url_support:
# In application/x-www-form-urlencoded, you can encodes space ' ' with '+' instead of '%20'.
# `id`, however, is part of a path where this technically is not allowed. Old versions of our
# scan apps still do it, so we try work around it!
q = Q(secret=raw_barcode.replace('+', ' '))
if any(cl.addon_match for cl in checkinlists):
q |= Q(addon_to__secret=raw_barcode.replace('+', ' '))
op_candidates = list(queryset.filter(q))
# 2. Handle the "nothing found" case: Either it's really a bogus secret that we don't know (-> error), or it
# might be a revoked one that we actually know (-> error, but with better error message and logging and
# with respecting the force option).
if not op_candidates:
revoked_matches = list(RevokedTicketSecret.objects.filter(event_id__in=list_by_event.keys(), secret=raw_barcode))
if len(revoked_matches) == 0:
checkinlists[0].event.log_action('pretix.event.checkin.unknown', data={
'datetime': datetime,
'type': checkin_type,
'list': checkinlists[0].pk,
'barcode': raw_barcode,
'searched_lists': [cl.pk for cl in checkinlists]
}, user=user, auth=auth)
for cl in checkinlists:
for k, s in cl.event.ticket_secret_generators.items():
try:
parsed = s.parse_secret(raw_barcode)
common_checkin_args.update({
'raw_item': parsed.item,
'raw_variation': parsed.variation,
'raw_subevent': parsed.subevent,
})
except:
pass
Checkin.objects.create(
position=None,
successful=False,
error_reason=Checkin.REASON_INVALID,
**common_checkin_args,
)
if force and legacy_url_support and isinstance(auth, Device):
# There was a bug in libpretixsync: If you scanned a ticket in offline mode that was
# valid at the time but no longer exists at time of upload, the device would retry to
# upload the same scan over and over again. Since we can't update all devices quickly,
# here's a dirty workaround to make it stop.
try:
brand = auth.software_brand
ver = parse(auth.software_version)
legacy_mode = (
(brand == 'pretixSCANPROXY' and ver < parse('0.0.3')) or
(brand == 'pretixSCAN Android' and ver < parse('1.11.2')) or
(brand == 'pretixSCAN' and ver < parse('1.11.2'))
)
if legacy_mode:
return Response({
'status': 'error',
'reason': Checkin.REASON_ALREADY_REDEEMED,
'reason_explanation': None,
'require_attention': False,
'__warning': 'Compatibility hack active due to detected old pretixSCAN version',
}, status=400)
except: # we don't care e.g. about invalid version numbers
pass
return Response({
'detail': 'Not found.', # for backwards compatibility
'status': 'error',
'reason': Checkin.REASON_INVALID,
'reason_explanation': None,
'require_attention': False,
'list': MiniCheckinListSerializer(checkinlists[0]).data,
}, status=404)
elif revoked_matches and force:
op_candidates = [revoked_matches[0].position]
if list_by_event[revoked_matches[0].event_id].addon_match:
op_candidates += list(revoked_matches[0].position.addons.all())
raw_barcode_for_checkin = raw_barcode
from_revoked_secret = True
else:
op = revoked_matches[0].position
op.order.log_action('pretix.event.checkin.revoked', data={
'datetime': datetime,
'type': checkin_type,
'list': list_by_event[revoked_matches[0].event_id].pk,
'barcode': raw_barcode
}, user=user, auth=auth)
common_checkin_args['list'] = list_by_event[revoked_matches[0].event_id]
Checkin.objects.create(
position=op,
successful=False,
error_reason=Checkin.REASON_REVOKED,
**common_checkin_args
)
return Response({
'status': 'error',
'reason': Checkin.REASON_REVOKED,
'reason_explanation': None,
'require_attention': False,
'position': CheckinListOrderPositionSerializer(op, context=_make_context(context, revoked_matches[0].event)).data,
'list': MiniCheckinListSerializer(list_by_event[revoked_matches[0].event_id]).data,
}, status=400)
# 3. Handle the "multiple options found" case: Except for the unlikely case of a secret being also a valid primary
# key on the same list, we're probably dealing with the ``addon_match`` case here and need to figure out
# which add-on has the right product.
if len(op_candidates) > 1:
op_candidates_matching_product = [
op for op in op_candidates
if (
(list_by_event[op.order.event_id].addon_match or op.secret == raw_barcode or legacy_url_support) and
(list_by_event[op.order.event_id].all_products or op.item_id in {i.pk for i in list_by_event[op.order.event_id].limit_products.all()})
)
]
if len(op_candidates_matching_product) == 0:
# None of the found add-ons has the correct product, too bad! We could just error out here, but
# instead we just continue with *any* product and have it rejected by the check in perform_checkin.
# This has the advantage of a better error message.
op_candidates = [op_candidates[0]]
elif len(op_candidates_matching_product) > 1:
# It's still ambiguous, we'll error out.
# We choose the first match (regardless of product) for the logging since it's most likely to be the
# base product according to our order_by above.
op = op_candidates[0]
op.order.log_action('pretix.event.checkin.denied', data={
'position': op.id,
'positionid': op.positionid,
'errorcode': Checkin.REASON_AMBIGUOUS,
'reason_explanation': None,
'force': force,
'datetime': datetime,
'type': checkin_type,
'list': list_by_event[op.order.event_id].pk,
}, user=user, auth=auth)
common_checkin_args['list'] = list_by_event[op.order.event_id]
Checkin.objects.create(
position=op,
successful=False,
error_reason=Checkin.REASON_AMBIGUOUS,
error_explanation=None,
**common_checkin_args,
)
return Response({
'status': 'error',
'reason': Checkin.REASON_AMBIGUOUS,
'reason_explanation': None,
'require_attention': op.item.checkin_attention or op.order.checkin_attention,
'position': CheckinListOrderPositionSerializer(op, context=_make_context(context, op.order.event)).data,
'list': MiniCheckinListSerializer(list_by_event[op.order.event_id]).data,
}, status=400)
else:
op_candidates = op_candidates_matching_product
op = op_candidates[0]
common_checkin_args['list'] = list_by_event[op.order.event_id]
# 5. Pre-validate all incoming answers, handle file upload
given_answers = {}
if answers_data:
for q in op.item.questions.filter(ask_during_checkin=True):
if str(q.pk) in answers_data:
try:
if q.type == Question.TYPE_FILE:
given_answers[q] = _handle_file_upload(answers_data[str(q.pk)], user, auth)
else:
given_answers[q] = q.clean_answer(answers_data[str(q.pk)])
except (ValidationError, BaseValidationError):
pass
# 6. Pass to our actual check-in logic
with language(op.order.event.settings.locale):
try:
perform_checkin(
op=op,
clist=list_by_event[op.order.event_id],
given_answers=given_answers,
force=force,
ignore_unpaid=ignore_unpaid,
nonce=nonce,
datetime=datetime,
questions_supported=questions_supported,
canceled_supported=canceled_supported,
user=user,
auth=auth,
type=checkin_type,
raw_barcode=raw_barcode_for_checkin,
from_revoked_secret=from_revoked_secret,
)
except RequiredQuestionsError as e:
return Response({
'status': 'incomplete',
'require_attention': op.item.checkin_attention or op.order.checkin_attention,
'position': CheckinListOrderPositionSerializer(op, context=_make_context(context, op.order.event)).data,
'questions': [
QuestionSerializer(q).data for q in e.questions
],
'list': MiniCheckinListSerializer(list_by_event[op.order.event_id]).data,
}, status=400)
except CheckInError as e:
op.order.log_action('pretix.event.checkin.denied', data={
'position': op.id,
'positionid': op.positionid,
'errorcode': e.code,
'reason_explanation': e.reason,
'force': force,
'datetime': datetime,
'type': checkin_type,
'list': list_by_event[op.order.event_id].pk,
}, user=user, auth=auth)
Checkin.objects.create(
position=op,
successful=False,
error_reason=e.code,
error_explanation=e.reason,
**common_checkin_args,
)
return Response({
'status': 'error',
'reason': e.code,
'reason_explanation': e.reason,
'require_attention': op.item.checkin_attention or op.order.checkin_attention,
'position': CheckinListOrderPositionSerializer(op, context=_make_context(context, op.order.event)).data,
'list': MiniCheckinListSerializer(list_by_event[op.order.event_id]).data,
}, status=400)
else:
return Response({
'status': 'ok',
'require_attention': op.item.checkin_attention or op.order.checkin_attention,
'position': CheckinListOrderPositionSerializer(op, context=_make_context(context, op.order.event)).data,
'list': MiniCheckinListSerializer(list_by_event[op.order.event_id]).data,
}, status=201)
class ExtendedBackend(DjangoFilterBackend):
def get_filterset_kwargs(self, request, queryset, view):
kwargs = super().get_filterset_kwargs(request, queryset, view)
@@ -310,6 +711,8 @@ class CheckinListPositionViewSet(viewsets.ReadOnlyModelViewSet):
def get_serializer_context(self):
ctx = super().get_serializer_context()
ctx['event'] = self.request.event
ctx['expand'] = self.request.query_params.getlist('expand')
ctx['pdf_data'] = self.request.query_params.get('pdf_data', 'false') == 'true'
return ctx
def get_filterset_kwargs(self):
@@ -320,80 +723,18 @@ class CheckinListPositionViewSet(viewsets.ReadOnlyModelViewSet):
@cached_property
def checkinlist(self):
try:
return get_object_or_404(CheckinList, event=self.request.event, pk=self.kwargs.get("list"))
return get_object_or_404(self.request.event.checkin_lists, pk=self.kwargs.get("list"))
except ValueError:
raise Http404()
def get_queryset(self, ignore_status=False, ignore_products=False):
cqs = Checkin.objects.filter(
position_id=OuterRef('pk'),
list_id=self.checkinlist.pk
).order_by().values('position_id').annotate(
m=Max('datetime')
).values('m')
qs = OrderPosition.objects.filter(
order__event=self.request.event,
).annotate(
last_checked_in=Subquery(cqs)
).prefetch_related('order__event', 'order__event__organizer')
if self.checkinlist.subevent:
qs = qs.filter(
subevent=self.checkinlist.subevent
)
if self.request.query_params.get('ignore_status', 'false') != 'true' and not ignore_status:
qs = qs.filter(
order__status__in=[Order.STATUS_PAID, Order.STATUS_PENDING] if self.checkinlist.include_pending else [Order.STATUS_PAID]
)
if self.request.query_params.get('pdf_data', 'false') == 'true':
qs = qs.prefetch_related(
Prefetch(
lookup='checkins',
queryset=Checkin.objects.filter(list_id=self.checkinlist.pk)
),
'answers', 'answers__options', 'answers__question',
Prefetch('addons', OrderPosition.objects.select_related('item', 'variation')),
Prefetch('order', Order.objects.select_related('invoice_address').prefetch_related(
Prefetch(
'event',
Event.objects.select_related('organizer')
),
Prefetch(
'positions',
OrderPosition.objects.prefetch_related(
Prefetch('checkins', queryset=Checkin.objects.all()),
'item', 'variation', 'answers', 'answers__options', 'answers__question',
)
)
))
).select_related(
'item', 'variation', 'item__category', 'addon_to', 'order', 'order__invoice_address', 'seat'
)
else:
qs = qs.prefetch_related(
Prefetch(
lookup='checkins',
queryset=Checkin.objects.filter(list_id=self.checkinlist.pk)
),
'answers', 'answers__options', 'answers__question',
Prefetch('addons', OrderPosition.objects.select_related('item', 'variation'))
).select_related('item', 'variation', 'order', 'addon_to', 'order__invoice_address', 'order', 'seat')
if not self.checkinlist.all_products and not ignore_products:
qs = qs.filter(item__in=self.checkinlist.limit_products.values_list('id', flat=True))
if 'subevent' in self.request.query_params.getlist('expand'):
qs = qs.prefetch_related(
'subevent', 'subevent__event', 'subevent__subeventitem_set', 'subevent__subeventitemvariation_set',
'subevent__seat_category_mappings', 'subevent__meta_values'
)
if 'item' in self.request.query_params.getlist('expand'):
qs = qs.prefetch_related('item', 'item__addons', 'item__bundles', 'item__meta_values', 'item__variations').select_related('item__tax_rule')
if 'variation' in self.request.query_params.getlist('expand'):
qs = qs.prefetch_related('variation')
qs = _checkin_list_position_queryset(
[self.checkinlist],
ignore_status=self.request.query_params.get('ignore_status', 'false') == 'true' or ignore_status,
ignore_products=ignore_products,
pdf_data=self.request.query_params.get('pdf_data', 'false') == 'true',
expand=self.request.query_params.getlist('expand'),
)
if 'pk' not in self.request.resolver_match.kwargs and 'can_view_orders' not in self.request.eventpermset \
and len(self.request.query_params.get('search', '')) < 3:
@@ -404,8 +745,8 @@ class CheckinListPositionViewSet(viewsets.ReadOnlyModelViewSet):
@action(detail=False, methods=['POST'], url_name='redeem', url_path='(?P<pk>.*)/redeem')
def redeem(self, *args, **kwargs):
force = bool(self.request.data.get('force', False))
type = self.request.data.get('type', None) or Checkin.TYPE_ENTRY
if type not in dict(Checkin.CHECKIN_TYPES):
checkin_type = self.request.data.get('type', None) or Checkin.TYPE_ENTRY
if checkin_type not in dict(Checkin.CHECKIN_TYPES):
raise ValidationError("Invalid check-in type.")
ignore_unpaid = bool(self.request.data.get('ignore_unpaid', False))
nonce = self.request.data.get('nonce')
@@ -414,280 +755,143 @@ class CheckinListPositionViewSet(viewsets.ReadOnlyModelViewSet):
or (isinstance(self.request.auth, Device) and 'pretixscan' in (self.request.auth.software_brand or '').lower())
)
if not self.checkinlist.all_products:
prefetch_related_objects([self.checkinlist], 'limit_products')
if 'datetime' in self.request.data:
dt = DateTimeField().to_internal_value(self.request.data.get('datetime'))
else:
dt = now()
common_checkin_args = dict(
raw_barcode=self.kwargs['pk'],
type=type,
list=self.checkinlist,
answers_data = self.request.data.get('answers')
return _redeem_process(
checkinlists=[self.checkinlist],
raw_barcode=kwargs['pk'],
answers_data=answers_data,
datetime=dt,
device=self.request.auth if isinstance(self.request.auth, Device) else None,
gate=self.request.auth.gate if isinstance(self.request.auth, Device) else None,
force=force,
checkin_type=checkin_type,
ignore_unpaid=ignore_unpaid,
nonce=nonce,
forced=force,
)
raw_barcode_for_checkin = None
from_revoked_secret = False
# 1. Gather a list of positions that could be the one we looking fore, either from their ID, secret or
# parent secret
queryset = self.get_queryset(ignore_status=True, ignore_products=True).order_by(
F('addon_to').asc(nulls_first=True)
untrusted_input=untrusted_input,
user=self.request.user,
auth=self.request.auth,
expand=self.request.query_params.getlist('expand'),
pdf_data=self.request.query_params.get('pdf_data', 'false') == 'true',
questions_supported=self.request.data.get('questions_supported', True),
canceled_supported=self.request.data.get('canceled_supported', False),
request=self.request, # this is not clean, but we need it in the serializers for URL generation
legacy_url_support=True,
)
q = Q(secret=self.kwargs['pk'])
if self.checkinlist.addon_match:
q |= Q(addon_to__secret=self.kwargs['pk'])
if self.kwargs['pk'].isnumeric() and not untrusted_input:
q |= Q(pk=self.kwargs['pk'])
op_candidates = list(queryset.filter(q))
if not op_candidates and '+' in self.kwargs['pk']:
# In application/x-www-form-urlencoded, you can encodes space ' ' with '+' instead of '%20'.
# `id`, however, is part of a path where this technically is not allowed. Old versions of our
# scan apps still do it, so we try work around it!
q = Q(secret=self.kwargs['pk'].replace('+', ' '))
if self.checkinlist.addon_match:
q |= Q(addon_to__secret=self.kwargs['pk'].replace('+', ' '))
op_candidates = list(queryset.filter(q))
# 2. Handle the "nothing found" case: Either it's really a bogus secret that we don't know (-> error), or it
# might be a revoked one that we actually know (-> error, but with better error message and logging and
# with respecting the force option).
if not op_candidates:
revoked_matches = list(self.request.event.revoked_secrets.filter(secret=self.kwargs['pk']))
if len(revoked_matches) == 0:
self.request.event.log_action('pretix.event.checkin.unknown', data={
'datetime': dt,
'type': type,
'list': self.checkinlist.pk,
'barcode': self.kwargs['pk']
}, user=self.request.user, auth=self.request.auth)
for k, s in self.request.event.ticket_secret_generators.items():
try:
parsed = s.parse_secret(self.kwargs['pk'])
common_checkin_args.update({
'raw_item': parsed.item,
'raw_variation': parsed.variation,
'raw_subevent': parsed.subevent,
})
except:
pass
Checkin.objects.create(
position=None,
successful=False,
error_reason=Checkin.REASON_INVALID,
**common_checkin_args,
)
if force and isinstance(self.request.auth, Device):
# There was a bug in libpretixsync: If you scanned a ticket in offline mode that was
# valid at the time but no longer exists at time of upload, the device would retry to
# upload the same scan over and over again. Since we can't update all devices quickly,
# here's a dirty workaround to make it stop.
try:
brand = self.request.auth.software_brand
ver = parse(self.request.auth.software_version)
legacy_mode = (
(brand == 'pretixSCANPROXY' and ver < parse('0.0.3')) or
(brand == 'pretixSCAN Android' and ver < parse('1.11.2')) or
(brand == 'pretixSCAN' and ver < parse('1.11.2'))
)
if legacy_mode:
return Response({
'status': 'error',
'reason': Checkin.REASON_ALREADY_REDEEMED,
'reason_explanation': None,
'require_attention': False,
'__warning': 'Compatibility hack active due to detected old pretixSCAN version',
}, status=400)
except: # we don't care e.g. about invalid version numbers
pass
return Response({
'detail': 'Not found.', # for backwards compatibility
'status': 'error',
'reason': Checkin.REASON_INVALID,
'reason_explanation': None,
'require_attention': False,
}, status=404)
elif revoked_matches and force:
op_candidates = [revoked_matches[0].position]
if self.checkinlist.addon_match:
op_candidates += list(revoked_matches[0].position.addons.all())
raw_barcode_for_checkin = self.kwargs['pk']
from_revoked_secret = True
else:
op = revoked_matches[0].position
op.order.log_action('pretix.event.checkin.revoked', data={
'datetime': dt,
'type': type,
'list': self.checkinlist.pk,
'barcode': self.kwargs['pk']
}, user=self.request.user, auth=self.request.auth)
Checkin.objects.create(
position=op,
successful=False,
error_reason=Checkin.REASON_REVOKED,
**common_checkin_args
)
return Response({
'status': 'error',
'reason': Checkin.REASON_REVOKED,
'reason_explanation': None,
'require_attention': False,
'position': CheckinListOrderPositionSerializer(op, context=self.get_serializer_context()).data
}, status=400)
# 3. Handle the "multiple options found" case: Except for the unlikely case of a secret being also a valid primary
# key on the same list, we're probably dealing with the ``addon_match`` case here and need to figure out
# which add-on has the right product.
if len(op_candidates) > 1:
if self.checkinlist.addon_match and not self.checkinlist.all_products:
op_candidates_matching_product = [
op for op in op_candidates if op.item_id in {i.pk for i in self.checkinlist.limit_products.all()}
]
else:
op_candidates_matching_product = op_candidates
if len(op_candidates_matching_product) == 0:
# None of the found add-ons has the correct product, too bad! We could just error out here, but
# instead we just continue with *any* product and have it rejected by the check in perform_checkin.
# This has the advantage of a better error message.
op_candidates = [op_candidates[0]]
elif len(op_candidates_matching_product) > 1:
# It's still ambiguous, we'll error out.
# We choose the first match (regardless of product) for the logging since it's most likely to be the
# base product according to our order_by above.
op = op_candidates[0]
op.order.log_action('pretix.event.checkin.denied', data={
'position': op.id,
'positionid': op.positionid,
'errorcode': Checkin.REASON_AMBIGUOUS,
'reason_explanation': None,
'force': force,
'datetime': dt,
'type': type,
'list': self.checkinlist.pk
}, user=self.request.user, auth=self.request.auth)
Checkin.objects.create(
position=op,
successful=False,
error_reason=Checkin.REASON_AMBIGUOUS,
error_explanation=None,
**common_checkin_args,
)
return Response({
'status': 'error',
'reason': Checkin.REASON_AMBIGUOUS,
'reason_explanation': None,
'require_attention': op.item.checkin_attention or op.order.checkin_attention,
'position': CheckinListOrderPositionSerializer(op, context=self.get_serializer_context()).data
}, status=400)
else:
op_candidates = op_candidates_matching_product
op = op_candidates[0]
# 5. Pre-validate all incoming answers, handle file upload
given_answers = {}
if 'answers' in self.request.data:
aws = self.request.data.get('answers')
for q in op.item.questions.filter(ask_during_checkin=True):
if str(q.pk) in aws:
try:
if q.type == Question.TYPE_FILE:
given_answers[q] = self._handle_file_upload(aws[str(q.pk)])
else:
given_answers[q] = q.clean_answer(aws[str(q.pk)])
except ValidationError:
pass
# 6. Pass to our actual check-in logic
with language(self.request.event.settings.locale):
try:
perform_checkin(
op=op,
clist=self.checkinlist,
given_answers=given_answers,
force=force,
ignore_unpaid=ignore_unpaid,
nonce=nonce,
datetime=dt,
questions_supported=self.request.data.get('questions_supported', True),
canceled_supported=self.request.data.get('canceled_supported', False),
user=self.request.user,
auth=self.request.auth,
type=type,
raw_barcode=raw_barcode_for_checkin,
from_revoked_secret=from_revoked_secret,
)
except RequiredQuestionsError as e:
return Response({
'status': 'incomplete',
'require_attention': op.item.checkin_attention or op.order.checkin_attention,
'position': CheckinListOrderPositionSerializer(op, context=self.get_serializer_context()).data,
'questions': [
QuestionSerializer(q).data for q in e.questions
]
}, status=400)
except CheckInError as e:
op.order.log_action('pretix.event.checkin.denied', data={
'position': op.id,
'positionid': op.positionid,
'errorcode': e.code,
'reason_explanation': e.reason,
'force': force,
'datetime': dt,
'type': type,
'list': self.checkinlist.pk
}, user=self.request.user, auth=self.request.auth)
Checkin.objects.create(
position=op,
successful=False,
error_reason=e.code,
error_explanation=e.reason,
**common_checkin_args,
)
return Response({
'status': 'error',
'reason': e.code,
'reason_explanation': e.reason,
'require_attention': op.item.checkin_attention or op.order.checkin_attention,
'position': CheckinListOrderPositionSerializer(op, context=self.get_serializer_context()).data
}, status=400)
else:
return Response({
'status': 'ok',
'require_attention': op.item.checkin_attention or op.order.checkin_attention,
'position': CheckinListOrderPositionSerializer(op, context=self.get_serializer_context()).data
}, status=201)
def _handle_file_upload(self, data):
try:
cf = CachedFile.objects.get(
session_key=f'api-upload-{str(type(self.request.user or self.request.auth))}-{(self.request.user or self.request.auth).pk}',
file__isnull=False,
pk=data[len("file:"):],
class CheckinRPCRedeemView(views.APIView):
def post(self, request, *args, **kwargs):
if isinstance(self.request.auth, (TeamAPIToken, Device)):
events = self.request.auth.get_events_with_permission(('can_change_orders', 'can_checkin_orders'))
elif self.request.user.is_authenticated:
events = self.request.user.get_events_with_permission(('can_change_orders', 'can_checkin_orders'), self.request).filter(
organizer=self.request.organizer
)
except (ValidationError, IndexError): # invalid uuid
raise ValidationError('The submitted file ID "{fid}" was not found.'.format(fid=data))
except CachedFile.DoesNotExist:
raise ValidationError('The submitted file ID "{fid}" was not found.'.format(fid=data))
else:
raise ValueError("unknown authentication method")
allowed_types = (
'image/png', 'image/jpeg', 'image/gif', 'application/pdf'
s = CheckinRPCRedeemInputSerializer(data=request.data, context={'events': events})
s.is_valid(raise_exception=True)
return _redeem_process(
checkinlists=s.validated_data['lists'],
raw_barcode=s.validated_data['secret'],
answers_data=s.validated_data.get('answers'),
datetime=s.validated_data.get('datetime') or now(),
force=s.validated_data['force'],
checkin_type=s.validated_data['type'],
ignore_unpaid=s.validated_data['ignore_unpaid'],
nonce=s.validated_data.get('nonce'),
untrusted_input=True,
user=self.request.user,
auth=self.request.auth,
expand=self.request.query_params.getlist('expand'),
pdf_data=self.request.query_params.get('pdf_data', 'false') == 'true',
questions_supported=s.validated_data['questions_supported'],
canceled_supported=True,
request=self.request, # this is not clean, but we need it in the serializers for URL generation
legacy_url_support=False,
)
if cf.type not in allowed_types:
raise ValidationError('The submitted file "{fid}" has a file type that is not allowed in this field.'.format(fid=data))
if cf.file.size > settings.FILE_UPLOAD_MAX_SIZE_OTHER:
raise ValidationError('The submitted file "{fid}" is too large to be used in this field.'.format(fid=data))
return cf.file
class CheckinRPCSearchView(ListAPIView):
serializer_class = CheckinListOrderPositionSerializer
queryset = OrderPosition.all.none()
filter_backends = (ExtendedBackend, RichOrderingFilter)
ordering = (F('attendee_name_cached').asc(nulls_last=True), 'positionid')
ordering_fields = (
'order__code', 'order__datetime', 'positionid', 'attendee_name',
'last_checked_in', 'order__email',
)
ordering_custom = {
'attendee_name': {
'_order': F('display_name').asc(nulls_first=True),
'display_name': Coalesce('attendee_name_cached', 'addon_to__attendee_name_cached')
},
'-attendee_name': {
'_order': F('display_name').desc(nulls_last=True),
'display_name': Coalesce('attendee_name_cached', 'addon_to__attendee_name_cached')
},
'last_checked_in': {
'_order': OrderBy(F('last_checked_in'), nulls_first=True),
},
'-last_checked_in': {
'_order': OrderBy(F('last_checked_in'), nulls_last=True, descending=True),
},
}
filterset_class = OrderPositionFilter
def get_serializer_context(self):
ctx = super().get_serializer_context()
ctx['expand'] = self.request.query_params.getlist('expand')
ctx['pdf_data'] = False
return ctx
@cached_property
def lists(self):
if isinstance(self.request.auth, (TeamAPIToken, Device)):
events = self.request.auth.get_events_with_permission(('can_view_orders', 'can_checkin_orders'))
elif self.request.user.is_authenticated:
events = self.request.user.get_events_with_permission(('can_view_orders', 'can_checkin_orders'), self.request).filter(
organizer=self.request.organizer
)
else:
raise ValueError("unknown authentication method")
requested_lists = [int(l) for l in self.request.query_params.getlist('list') if l.isdigit()]
lists = list(
CheckinList.objects.filter(event__in=events).select_related('event').filter(id__in=requested_lists)
)
if len(lists) != len(requested_lists):
missing_lists = set(requested_lists) - {l.pk for l in lists}
raise PermissionDenied("You requested lists that do not exist or that you do not have access to: " + ", ".join(str(l) for l in missing_lists))
return lists
@cached_property
def has_full_access_permission(self):
if isinstance(self.request.auth, (TeamAPIToken, Device)):
events = self.request.auth.get_events_with_permission('can_view_orders')
elif self.request.user.is_authenticated:
events = self.request.user.get_events_with_permission('can_view_orders', self.request).filter(
organizer=self.request.organizer
)
else:
raise ValueError("unknown authentication method")
full_access_lists = CheckinList.objects.filter(event__in=events).filter(id__in=[c.pk for c in self.lists]).count()
return len(self.lists) == full_access_lists
def get_queryset(self, ignore_status=False, ignore_products=False):
qs = _checkin_list_position_queryset(
self.lists,
ignore_status=self.request.query_params.get('ignore_status', 'false') == 'true' or ignore_status,
ignore_products=ignore_products,
pdf_data=self.request.query_params.get('pdf_data', 'false') == 'true',
expand=self.request.query_params.getlist('expand'),
)
if len(self.request.query_params.get('search', '')) < 3 and not self.has_full_access_permission:
qs = qs.none()
return qs

View File

@@ -187,6 +187,8 @@ class OrderViewSet(viewsets.ModelViewSet):
def get_serializer_context(self):
ctx = super().get_serializer_context()
ctx['event'] = self.request.event
ctx['pdf_data'] = self.request.query_params.get('pdf_data', 'false') == 'true'
ctx['exclude'] = self.request.query_params.getlist('exclude')
return ctx
def get_queryset(self):
@@ -932,6 +934,7 @@ class OrderPositionViewSet(viewsets.ModelViewSet):
def get_serializer_context(self):
ctx = super().get_serializer_context()
ctx['event'] = self.request.event
ctx['pdf_data'] = self.request.query_params.get('pdf_data', 'false') == 'true'
return ctx
def get_queryset(self):

View File

@@ -34,7 +34,9 @@
import binascii
import json
import operator
from datetime import timedelta
from functools import reduce
from urllib.parse import urlparse
import webauthn
@@ -491,11 +493,14 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
if request and self.has_active_staff_session(request.session.session_key):
return Event.objects.all()
kwargs = {permission: True}
if isinstance(permission, (tuple, list)):
q = reduce(operator.or_, [Q(**{p: True}) for p in permission])
else:
q = Q(**{permission: True})
return Event.objects.filter(
Q(organizer_id__in=self.teams.filter(all_events=True, **kwargs).values_list('organizer', flat=True))
| Q(id__in=self.teams.filter(**kwargs).values_list('limit_events__id', flat=True))
Q(organizer_id__in=self.teams.filter(q, all_events=True).values_list('organizer', flat=True))
| Q(id__in=self.teams.filter(q).values_list('limit_events__id', flat=True))
)
@scopes_disabled()

View File

@@ -255,7 +255,9 @@ class Device(LoggedModel):
:param request: Ignored, for compatibility with User model
:return: Iterable of Events
"""
if permission in self.permission_set():
if (
isinstance(permission, (list, tuple)) and any(p in self.permission_set() for p in permission)
) or (isinstance(permission, str) and permission in self.permission_set()):
return self.get_events_with_any_permission()
else:
return self.organizer.events.none()

View File

@@ -461,7 +461,9 @@ class TeamAPIToken(models.Model):
:param request: Ignored, for compatibility with User model
:return: Iterable of Events
"""
if getattr(self.team, permission, False):
if (
isinstance(permission, (list, tuple)) and any(getattr(self.team, p, False) for p in permission)
) or (isinstance(permission, str) and getattr(self.team, permission, False)):
return self.get_events_with_any_permission()
else:
return self.team.organizer.events.none()

View File

@@ -229,13 +229,14 @@ def clist_all(event, item):
@pytest.mark.django_db
def test_list_list(token_client, organizer, event, clist, item, subevent):
def test_list_list(token_client, organizer, event, clist, item, subevent, django_assert_num_queries):
res = dict(TEST_LIST_RES)
res["id"] = clist.pk
res["limit_products"] = [item.pk]
res["auto_checkin_sales_channels"] = []
resp = token_client.get('/api/v1/organizers/{}/events/{}/checkinlists/'.format(organizer.slug, event.slug))
with django_assert_num_queries(11):
resp = token_client.get('/api/v1/organizers/{}/events/{}/checkinlists/'.format(organizer.slug, event.slug))
assert resp.status_code == 200
assert [res] == resp.data['results']
@@ -436,7 +437,7 @@ def test_list_update(token_client, organizer, event, clist):
@pytest.mark.django_db
def test_list_all_items_positions(token_client, organizer, event, clist, clist_all, item, other_item, order):
def test_list_all_items_positions(token_client, organizer, event, clist, clist_all, item, other_item, order, django_assert_num_queries):
with scopes_disabled():
p1 = dict(TEST_ORDERPOSITION1_RES)
p1["id"] = order.positions.get(positionid=1).pk
@@ -450,9 +451,10 @@ def test_list_all_items_positions(token_client, organizer, event, clist, clist_a
p3["addon_to"] = p1["id"]
# All items
resp = token_client.get('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/?ordering=positionid'.format(
organizer.slug, event.slug, clist_all.pk
))
with django_assert_num_queries(23):
resp = token_client.get('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/?ordering=positionid'.format(
organizer.slug, event.slug, clist_all.pk
))
assert resp.status_code == 200
assert [p1, p2, p3] == resp.data['results']
@@ -684,17 +686,31 @@ def test_status(token_client, organizer, event, clist_all, item, other_item, ord
]
def _redeem(token_client, org, clist, p, body=None):
return token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
org.slug, clist.event.slug, clist.pk, p
), body or {}, format='json')
@pytest.mark.django_db
def test_query_load(token_client, organizer, clist, event, order, django_assert_max_num_queries):
with scopes_disabled():
p = order.positions.first().pk
with django_assert_max_num_queries(30):
resp = _redeem(token_client, organizer, clist, p)
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_custom_datetime(token_client, organizer, clist, event, order):
dt = now() - datetime.timedelta(days=1)
dt = dt.replace(microsecond=0)
with scopes_disabled():
p = order.positions.first().pk
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p
), {
resp = _redeem(token_client, organizer, clist, p, {
'datetime': dt.isoformat()
}, format='json')
})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
@@ -710,9 +726,7 @@ def test_name_fallback(token_client, organizer, clist, event, order):
op.attendee_name_cached = None
op.attendee_name_parts = {}
op.save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, op.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, op.pk, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
assert resp.data['position']['attendee_name'] == 'Paul'
@@ -723,9 +737,7 @@ def test_name_fallback(token_client, organizer, clist, event, order):
def test_by_secret(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.secret
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@@ -736,9 +748,7 @@ def test_by_secret_special_chars(token_client, organizer, clist, event, order):
p = order.positions.first()
p.secret = "abc+-/=="
p.save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, urlquote(p.secret, safe='')
), {}, format='json')
resp = _redeem(token_client, organizer, clist, urlquote(p.secret, safe=''), {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@@ -749,9 +759,7 @@ def test_by_secret_special_chars_space_fallback(token_client, organizer, clist,
p = order.positions.first()
p.secret = "foo bar"
p.save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, "foo+bar"
), {}, format='json')
resp = _redeem(token_client, organizer, clist, "foo+bar", {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@@ -760,14 +768,10 @@ def test_by_secret_special_chars_space_fallback(token_client, organizer, clist,
def test_only_once(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'already_redeemed'
@@ -777,14 +781,10 @@ def test_only_once(token_client, organizer, clist, event, order):
def test_reupload_same_nonce(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'nonce': 'foobar'}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'nonce': 'foobar'}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@@ -795,14 +795,10 @@ def test_allow_multiple(token_client, organizer, clist, event, order):
clist.save()
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
@@ -815,14 +811,10 @@ def test_allow_multiple_reupload_same_nonce(token_client, organizer, clist, even
clist.save()
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'nonce': 'foobar'}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'nonce': 'foobar'}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
@@ -833,14 +825,10 @@ def test_allow_multiple_reupload_same_nonce(token_client, organizer, clist, even
def test_multiple_different_list(token_client, organizer, clist, clist_all, event, order):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'nonce': 'foobar'}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist_all.pk, p.pk
), {'nonce': 'baz'}, format='json')
resp = _redeem(token_client, organizer, clist_all, p.pk, {'nonce': 'baz'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@@ -849,14 +837,10 @@ def test_multiple_different_list(token_client, organizer, clist, clist_all, even
def test_forced_multiple(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'force': True}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'force': True})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@@ -865,17 +849,13 @@ def test_forced_multiple(token_client, organizer, clist, event, order):
def test_forced_flag_set_if_required(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'force': True}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'force': True})
with scopes_disabled():
assert not p.checkins.order_by('pk').last().forced
assert p.checkins.order_by('pk').last().force_sent
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'force': True}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'force': True})
with scopes_disabled():
assert p.checkins.order_by('pk').last().forced
assert p.checkins.order_by('pk').last().force_sent
@@ -889,9 +869,7 @@ def test_require_product(token_client, organizer, clist, event, order):
clist.limit_products.clear()
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'product'
@@ -904,32 +882,24 @@ def test_require_paid(token_client, organizer, clist, event, order):
order.status = Order.STATUS_CANCELED
order.save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'unpaid'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'canceled_supported': True}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'canceled_supported': True})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'canceled'
order.status = Order.STATUS_PENDING
order.save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'unpaid'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'ignore_unpaid': True}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'ignore_unpaid': True})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'unpaid'
@@ -937,16 +907,12 @@ def test_require_paid(token_client, organizer, clist, event, order):
clist.include_pending = True
clist.save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'unpaid'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'ignore_unpaid': True}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'ignore_unpaid': True})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@@ -968,17 +934,13 @@ def test_question_number(token_client, organizer, clist, event, order, question)
question[0].type = 'N'
question[0].save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: "3.24"}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: "3.24"}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
@@ -989,17 +951,13 @@ def test_question_number(token_client, organizer, clist, event, order, question)
def test_question_choice(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: str(question[1].pk)}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: str(question[1].pk)}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
@@ -1011,17 +969,13 @@ def test_question_choice(token_client, organizer, clist, event, order, question)
def test_question_choice_identifier(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: str(question[1].identifier)}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: str(question[1].identifier)}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
@@ -1033,9 +987,7 @@ def test_question_choice_identifier(token_client, organizer, clist, event, order
def test_question_invalid(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: "A"}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: "A"}})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
@@ -1049,17 +1001,13 @@ def test_question_required(token_client, organizer, clist, event, order, questio
question[0].required = True
question[0].save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: ""}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: ""}})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
@@ -1073,17 +1021,13 @@ def test_question_optional(token_client, organizer, clist, event, order, questio
question[0].required = False
question[0].save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {}})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: ""}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: ""}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@@ -1095,17 +1039,13 @@ def test_question_multiple_choice(token_client, organizer, clist, event, order,
question[0].type = 'M'
question[0].save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: "{},{}".format(question[1].pk, question[2].pk)}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: "{},{}".format(question[1].pk, question[2].pk)}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
@@ -1132,23 +1072,17 @@ def test_question_upload(token_client, organizer, clist, event, order, question)
question[0].type = 'F'
question[0].save()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: "invalid"}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: "invalid"}})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {'answers': {question[0].pk: file_id_png}}, format='json')
resp = _redeem(token_client, organizer, clist, p.pk, {'answers': {question[0].pk: file_id_png}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
@@ -1200,11 +1134,7 @@ def test_store_failed(token_client, organizer, clist, event, order):
@pytest.mark.django_db
def test_redeem_unknown(token_client, organizer, clist, event, order):
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'unknown_secret'
), {
'force': True
}, format='json')
resp = _redeem(token_client, organizer, clist, 'unknown_secret', {'force': True})
assert resp.status_code == 404
assert resp.data["status"] == "error"
assert resp.data["reason"] == "invalid"
@@ -1217,10 +1147,7 @@ def test_redeem_unknown_revoked(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
event.revoked_secrets.create(position=p, secret='revoked_secret')
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'revoked_secret'
), {
}, format='json')
resp = _redeem(token_client, organizer, clist, 'revoked_secret', {})
assert resp.status_code == 400
assert resp.data["status"] == "error"
assert resp.data["reason"] == "revoked"
@@ -1233,11 +1160,7 @@ def test_redeem_unknown_revoked_force(token_client, organizer, clist, event, ord
with scopes_disabled():
p = order.positions.first()
event.revoked_secrets.create(position=p, secret='revoked_secret')
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'revoked_secret'
), {
'force': True
}, format='json')
resp = _redeem(token_client, organizer, clist, 'revoked_secret', {'force': True})
assert resp.status_code == 201
assert resp.data["status"] == "ok"
with scopes_disabled():
@@ -1252,11 +1175,7 @@ def test_redeem_unknown_legacy_device_bug(device, device_client, organizer, clis
device.software_brand = "pretixSCAN"
device.software_version = "1.11.1"
device.save()
resp = device_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'unknown_secret'
), {
'force': True
}, format='json')
resp = _redeem(device_client, organizer, clist, 'unknown_secret', {'force': True})
assert resp.status_code == 400
assert resp.data["status"] == "error"
assert resp.data["reason"] == "already_redeemed"
@@ -1266,11 +1185,7 @@ def test_redeem_unknown_legacy_device_bug(device, device_client, organizer, clis
device.software_brand = "pretixSCAN"
device.software_version = "1.11.2"
device.save()
resp = device_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'unknown_secret'
), {
'force': True
}, format='json')
resp = _redeem(device_client, organizer, clist, 'unknown_secret', {'force': True})
assert resp.status_code == 404
assert resp.data["status"] == "error"
assert resp.data["reason"] == "invalid"
@@ -1285,17 +1200,9 @@ def test_redeem_by_id_not_allowed_if_pretixscan(device, device_client, organizer
device.software_brand = "pretixSCAN"
device.software_version = "1.14.2"
device.save()
resp = device_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.pk
), {
'force': True
}, format='json')
resp = _redeem(device_client, organizer, clist, p.pk, {'force': True})
assert resp.status_code == 404
resp = device_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, p.secret
), {
'force': True
}, format='json')
resp = _redeem(device_client, organizer, clist, p.secret, {'force': True})
assert resp.status_code == 201
@@ -1323,10 +1230,7 @@ def test_redeem_addon_if_match_disabled(token_client, organizer, clist, other_it
clist.all_products = False
clist.save()
clist.limit_products.set([other_item])
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w'
), {
}, format='json')
resp = _redeem(token_client, organizer, clist, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w', {})
assert resp.status_code == 400
assert resp.data["status"] == "error"
assert resp.data["reason"] == "product"
@@ -1342,10 +1246,7 @@ def test_redeem_addon_if_match_enabled(token_client, organizer, clist, other_ite
clist.save()
clist.limit_products.set([other_item])
p = order.positions.first().addons.all().first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w'
), {
}, format='json')
resp = _redeem(token_client, organizer, clist, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w', {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
assert resp.data['position']['attendee_name'] == 'Peter' # test propagation of names
@@ -1362,10 +1263,7 @@ def test_redeem_addon_if_match_ambiguous(token_client, organizer, clist, item, o
clist.addon_match = True
clist.save()
clist.limit_products.set([item, other_item])
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w'
), {
}, format='json')
resp = _redeem(token_client, organizer, clist, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w', {})
assert resp.status_code == 400
assert resp.data["status"] == "error"
assert resp.data["reason"] == "ambiguous"
@@ -1382,11 +1280,7 @@ def test_redeem_addon_if_match_and_revoked_force(token_client, organizer, clist,
clist.save()
clist.limit_products.set([other_item])
p = order.positions.first().addons.all().first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/{}/redeem/'.format(
organizer.slug, event.slug, clist.pk, 'revoked_secret'
), {
'force': True
}, format='json')
resp = _redeem(token_client, organizer, clist, 'revoked_secret', {'force': True})
assert resp.status_code == 201
assert resp.data["status"] == "ok"
with scopes_disabled():
@@ -1394,3 +1288,35 @@ def test_redeem_addon_if_match_and_revoked_force(token_client, organizer, clist,
assert ci.forced
assert ci.force_sent
assert ci.position == p
@pytest.mark.django_db
def test_search(token_client, organizer, event, clist, clist_all, item, other_item, order, django_assert_max_num_queries):
with scopes_disabled():
p1 = dict(TEST_ORDERPOSITION1_RES)
p1["id"] = order.positions.get(positionid=1).pk
p1["item"] = item.pk
with django_assert_max_num_queries(17):
resp = token_client.get('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/?search=z3fsn8jyu'.format(
organizer.slug, event.slug, clist_all.pk
))
assert resp.status_code == 200
assert [p1] == resp.data['results']
@pytest.mark.django_db
def test_checkin_pdf_data_requires_permission(token_client, event, team, organizer, clist_all, order):
resp = token_client.get('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/?search=z3fsn8jyu&pdf_data=true'.format(
organizer.slug, event.slug, clist_all.pk
))
assert resp.data['results'][0].get('pdf_data')
with scopes_disabled():
team.can_view_orders = False
team.can_change_orders = False
team.can_checkin_orders = True
team.save()
resp = token_client.get('/api/v1/organizers/{}/events/{}/checkinlists/{}/positions/?search=z3fsn8jyu&pdf_data=true'.format(
organizer.slug, event.slug, clist_all.pk
))
assert not resp.data['results'][0].get('pdf_data')

View File

@@ -0,0 +1,911 @@
#
# This file is part of pretix (Community Edition).
#
# Copyright (C) 2014-2020 Raphael Michel and contributors
# Copyright (C) 2020-2021 rami.io GmbH and contributors
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation in version 3 of the License.
#
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
# this file, see <https://pretix.eu/about/en/license>.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import datetime
from decimal import Decimal
from unittest import mock
import pytest
from django.core.files.base import ContentFile
from django.utils.timezone import now
from django_countries.fields import Country
from django_scopes import scopes_disabled
from i18nfield.strings import LazyI18nString
from pytz import UTC
from pretix.api.serializers.item import QuestionSerializer
from pretix.base.models import Checkin, InvoiceAddress, Order, OrderPosition
# Lots of this code is overlapping with test_checkin.py, and some of it is arguably redundant since it's triggering
# the same backend code paths (for now). However, this is SUCH a critical part of pretix that we don't want to take
# the risk of some day having differing implementations and missing vital test coverage.
@pytest.fixture
def item(event):
return event.items.create(name="Budget Ticket", default_price=23)
@pytest.fixture
def item_on_event2(event2):
return event2.items.create(name="Budget Ticket", default_price=23)
@pytest.fixture
def other_item(event):
return event.items.create(name="Budget Ticket", default_price=23)
@pytest.fixture
def order(event, item, other_item, taxrule):
testtime = datetime.datetime(2017, 12, 1, 10, 0, 0, tzinfo=UTC)
with mock.patch('django.utils.timezone.now') as mock_now:
mock_now.return_value = testtime
o = Order.objects.create(
code='FOO', event=event, email='dummy@dummy.test',
status=Order.STATUS_PAID, secret="k24fiuwvu8kxz3y1",
datetime=datetime.datetime(2017, 12, 1, 10, 0, 0, tzinfo=UTC),
expires=datetime.datetime(2017, 12, 10, 10, 0, 0, tzinfo=UTC),
total=46, locale='en'
)
InvoiceAddress.objects.create(order=o, company="Sample company", country=Country('NZ'))
op1 = OrderPosition.objects.create(
order=o,
positionid=1,
item=item,
variation=None,
price=Decimal("23"),
attendee_name_parts={'full_name': "Peter"},
secret="z3fsn8jyufm5kpk768q69gkbyr5f4h6w",
pseudonymization_id="ABCDEFGHKL",
)
OrderPosition.objects.create(
order=o,
positionid=2,
item=other_item,
variation=None,
price=Decimal("23"),
attendee_name_parts={'full_name': "Michael"},
secret="sf4HZG73fU6kwddgjg2QOusFbYZwVKpK",
pseudonymization_id="BACDEFGHKL",
)
OrderPosition.objects.create(
order=o,
positionid=3,
item=other_item,
addon_to=op1,
variation=None,
price=Decimal("0"),
secret="3u4ez6vrrbgb3wvezxhq446p548dt2wn",
pseudonymization_id="FOOBAR12345",
)
return o
@pytest.fixture
def order2(event2, item_on_event2):
testtime = datetime.datetime(2017, 12, 1, 10, 0, 0, tzinfo=UTC)
with mock.patch('django.utils.timezone.now') as mock_now:
mock_now.return_value = testtime
o = Order.objects.create(
code='BAR', event=event2, email='dummy@dummy.test',
status=Order.STATUS_PAID, secret="ylptCPNOxTyA",
datetime=datetime.datetime(2017, 12, 1, 10, 0, 0, tzinfo=UTC),
expires=datetime.datetime(2017, 12, 10, 10, 0, 0, tzinfo=UTC),
total=46, locale='en'
)
InvoiceAddress.objects.create(order=o, company="Sample company", country=Country('NZ'))
OrderPosition.objects.create(
order=o,
positionid=1,
item=item_on_event2,
variation=None,
price=Decimal("23"),
attendee_name_parts={'full_name': "John"},
secret="y8tPmyc5BEK2G9pifSNumwp4NXAaIE4P",
pseudonymization_id="A23456789",
)
OrderPosition.objects.create(
order=o,
positionid=2,
item=item_on_event2,
variation=None,
price=Decimal("23"),
attendee_name_parts={'full_name': "Paul"},
secret="xrahgLCfodoNOIZ4uxn75gNBM1bb6m1h",
pseudonymization_id="B23456797345",
)
return o
TEST_ORDERPOSITION1_RES = {
"id": 1,
"require_attention": False,
"order__status": "p",
"order": "FOO",
"positionid": 1,
"item": 1,
"variation": None,
"price": "23.00",
"attendee_name": "Peter",
"attendee_name_parts": {'full_name': "Peter"},
"attendee_email": None,
"voucher": None,
"tax_rate": "0.00",
"tax_value": "0.00",
"tax_rule": None,
"secret": "z3fsn8jyufm5kpk768q69gkbyr5f4h6w",
"addon_to": None,
"checkins": [],
"downloads": [],
"answers": [],
"seat": None,
"company": None,
"street": None,
"zipcode": None,
"city": None,
"country": None,
"state": None,
"subevent": None,
"pseudonymization_id": "ABCDEFGHKL",
}
@pytest.fixture
def clist(event, item):
c = event.checkin_lists.create(name="Default", all_products=False)
c.limit_products.add(item)
return c
@pytest.fixture
def clist_all(event, item):
c = event.checkin_lists.create(name="Default", all_products=True)
return c
@pytest.fixture
def clist_event2(event2):
c = event2.checkin_lists.create(name="Event 2", all_products=True)
return c
def _redeem(token_client, org, clist, p, body=None, query=''):
body = body or {}
if isinstance(clist, list):
body['lists'] = [c.pk for c in clist]
else:
body['lists'] = [clist.pk]
body['secret'] = p
return token_client.post('/api/v1/organizers/{}/checkinrpc/redeem/{}'.format(
org.slug, query,
), body, format='json')
@pytest.mark.django_db
def test_query_load(token_client, organizer, clist, event, order, django_assert_max_num_queries):
with scopes_disabled():
p = order.positions.first()
with django_assert_max_num_queries(30):
resp = _redeem(token_client, organizer, clist, p.secret)
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_custom_datetime(token_client, organizer, clist, event, order):
dt = now() - datetime.timedelta(days=1)
dt = dt.replace(microsecond=0)
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {
'datetime': dt.isoformat()
})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
assert Checkin.objects.last().datetime == dt
@pytest.mark.django_db
def test_name_fallback(token_client, organizer, clist, event, order):
order.invoice_address.name_parts = {'_legacy': 'Paul'}
order.invoice_address.save()
with scopes_disabled():
op = order.positions.first()
op.attendee_name_cached = None
op.attendee_name_parts = {}
op.save()
resp = _redeem(token_client, organizer, clist, op.secret, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
assert resp.data['position']['attendee_name'] == 'Paul'
assert resp.data['position']['attendee_name_parts'] == {'_legacy': 'Paul'}
@pytest.mark.django_db
def test_by_pk_not_allowed(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.pk, {})
assert resp.status_code == 404
@pytest.mark.django_db
def test_by_secret(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_by_secret_special_chars(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
p.secret = "abc+-/=="
p.save()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_only_once(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'already_redeemed'
@pytest.mark.django_db
def test_reupload_same_nonce(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = _redeem(token_client, organizer, clist, p.secret, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_allow_multiple(token_client, organizer, clist, event, order):
clist.allow_multiple_entries = True
clist.save()
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
assert p.checkins.count() == 2
@pytest.mark.django_db
def test_allow_multiple_reupload_same_nonce(token_client, organizer, clist, event, order):
clist.allow_multiple_entries = True
clist.save()
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = _redeem(token_client, organizer, clist, p.secret, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
assert p.checkins.count() == 1
@pytest.mark.django_db
def test_multiple_different_list(token_client, organizer, clist, clist_all, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {'nonce': 'foobar'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = _redeem(token_client, organizer, clist_all, p.secret, {'nonce': 'baz'})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_forced_multiple(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = _redeem(token_client, organizer, clist, p.secret, {'force': True})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_forced_flag_set_if_required(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {'force': True})
with scopes_disabled():
assert not p.checkins.order_by('pk').last().forced
assert p.checkins.order_by('pk').last().force_sent
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
resp = _redeem(token_client, organizer, clist, p.secret, {'force': True})
with scopes_disabled():
assert p.checkins.order_by('pk').last().forced
assert p.checkins.order_by('pk').last().force_sent
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_require_product(token_client, organizer, clist, event, order):
with scopes_disabled():
clist.limit_products.clear()
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'product'
@pytest.mark.django_db
def test_require_paid(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
order.status = Order.STATUS_CANCELED
order.save()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'canceled'
order.status = Order.STATUS_PENDING
order.save()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'unpaid'
resp = _redeem(token_client, organizer, clist, p.secret, {'ignore_unpaid': True})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'unpaid'
clist.include_pending = True
clist.save()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'error'
assert resp.data['reason'] == 'unpaid'
resp = _redeem(token_client, organizer, clist, p.secret, {'ignore_unpaid': True})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.fixture
def question(event, item):
q = event.questions.create(question=LazyI18nString('Size'), type='C', required=True, ask_during_checkin=True)
a1 = q.options.create(answer=LazyI18nString("M"))
a2 = q.options.create(answer=LazyI18nString("L"))
q.items.add(item)
return q, a1, a2
@pytest.mark.django_db
def test_question_number(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
question[0].options.all().delete()
question[0].type = 'N'
question[0].save()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {question[0].pk: "3.24"}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
assert order.positions.first().answers.get(question=question[0]).answer == '3.24'
@pytest.mark.django_db
def test_question_choice(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {question[0].pk: str(question[1].pk)}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
assert order.positions.first().answers.get(question=question[0]).answer == 'M'
assert list(order.positions.first().answers.get(question=question[0]).options.all()) == [question[1]]
@pytest.mark.django_db
def test_question_choice_identifier(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {question[0].pk: str(question[1].identifier)}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
assert order.positions.first().answers.get(question=question[0]).answer == 'M'
assert list(order.positions.first().answers.get(question=question[0]).options.all()) == [question[1]]
@pytest.mark.django_db
def test_question_invalid(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {question[0].pk: "A"}})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
@pytest.mark.django_db
def test_question_required(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
question[0].required = True
question[0].save()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {question[0].pk: ""}})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
@pytest.mark.django_db
def test_question_optional(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
question[0].required = False
question[0].save()
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {}})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {question[0].pk: ""}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
@pytest.mark.django_db
def test_question_multiple_choice(token_client, organizer, clist, event, order, question):
with scopes_disabled():
p = order.positions.first()
question[0].type = 'M'
question[0].save()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = _redeem(token_client, organizer, clist, p.secret,
{'answers': {question[0].pk: "{},{}".format(question[1].pk, question[2].pk)}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
assert order.positions.first().answers.get(question=question[0]).answer == 'M, L'
assert set(order.positions.first().answers.get(question=question[0]).options.all()) == {question[1],
question[2]}
@pytest.mark.django_db
def test_question_upload(token_client, organizer, clist, event, order, question):
r = token_client.post(
'/api/v1/upload',
data={
'media_type': 'image/png',
'file': ContentFile('file.png', 'invalid png content')
},
format='upload',
HTTP_CONTENT_DISPOSITION='attachment; filename="file.png"',
)
assert r.status_code == 201
file_id_png = r.data['id']
with scopes_disabled():
p = order.positions.first()
question[0].type = 'F'
question[0].save()
resp = _redeem(token_client, organizer, clist, p.secret, {})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
with scopes_disabled():
assert resp.data['questions'] == [QuestionSerializer(question[0]).data]
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {question[0].pk: "invalid"}})
assert resp.status_code == 400
assert resp.data['status'] == 'incomplete'
resp = _redeem(token_client, organizer, clist, p.secret, {'answers': {question[0].pk: file_id_png}})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
with scopes_disabled():
assert order.positions.first().answers.get(question=question[0]).answer.startswith('file://')
assert order.positions.first().answers.get(question=question[0]).file
@pytest.mark.django_db
def test_store_failed(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/failed_checkins/'.format(
organizer.slug, event.slug, clist.pk,
), {
'raw_barcode': '123456',
'error_reason': 'invalid'
}, format='json')
assert resp.status_code == 201
with scopes_disabled():
assert Checkin.all.filter(successful=False).exists()
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/failed_checkins/'.format(
organizer.slug, event.slug, clist.pk,
), {
'raw_barcode': '123456',
'position': p.pk,
'error_reason': 'unpaid'
}, format='json')
assert resp.status_code == 201
with scopes_disabled():
assert p.all_checkins.filter(successful=False).count() == 1
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/failed_checkins/'.format(
organizer.slug, event.slug, clist.pk,
), {
'position': p.pk,
'error_reason': 'unpaid'
}, format='json')
assert resp.status_code == 400
resp = token_client.post('/api/v1/organizers/{}/events/{}/checkinlists/{}/failed_checkins/'.format(
organizer.slug, event.slug, clist.pk,
), {
'raw_barcode': '123456',
'error_reason': 'unknown'
}, format='json')
assert resp.status_code == 400
@pytest.mark.django_db
def test_redeem_unknown(token_client, organizer, clist, event, order):
resp = _redeem(token_client, organizer, clist, 'unknown_secret', {'force': True})
assert resp.status_code == 404
assert resp.data["status"] == "error"
assert resp.data["reason"] == "invalid"
with scopes_disabled():
assert not Checkin.objects.last()
@pytest.mark.django_db
def test_redeem_unknown_revoked(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
event.revoked_secrets.create(position=p, secret='revoked_secret')
resp = _redeem(token_client, organizer, clist, 'revoked_secret', {})
assert resp.status_code == 400
assert resp.data["status"] == "error"
assert resp.data["reason"] == "revoked"
with scopes_disabled():
assert not Checkin.objects.last()
@pytest.mark.django_db
def test_redeem_unknown_revoked_force(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
event.revoked_secrets.create(position=p, secret='revoked_secret')
resp = _redeem(token_client, organizer, clist, 'revoked_secret', {'force': True})
assert resp.status_code == 201
assert resp.data["status"] == "ok"
with scopes_disabled():
ci = Checkin.objects.last()
assert ci.forced
assert ci.force_sent
assert ci.position == p
@pytest.mark.django_db
def test_redeem_addon_if_match_disabled(token_client, organizer, clist, other_item, event, order):
with scopes_disabled():
clist.all_products = False
clist.save()
clist.limit_products.set([other_item])
resp = _redeem(token_client, organizer, clist, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w', {})
assert resp.status_code == 400
assert resp.data["status"] == "error"
assert resp.data["reason"] == "product"
with scopes_disabled():
assert not Checkin.objects.last()
@pytest.mark.django_db
def test_redeem_addon_if_match_enabled(token_client, organizer, clist, other_item, event, order):
with scopes_disabled():
clist.all_products = False
clist.addon_match = True
clist.save()
clist.limit_products.set([other_item])
p = order.positions.first().addons.all().first()
resp = _redeem(token_client, organizer, clist, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w', {})
assert resp.status_code == 201
assert resp.data['status'] == 'ok'
assert resp.data['position']['attendee_name'] == 'Peter' # test propagation of names
assert resp.data['position']['item'] == other_item.pk
with scopes_disabled():
ci = Checkin.objects.last()
assert ci.position == p
@pytest.mark.django_db
def test_redeem_addon_if_match_ambiguous(token_client, organizer, clist, item, other_item, event, order):
with scopes_disabled():
clist.all_products = False
clist.addon_match = True
clist.save()
clist.limit_products.set([item, other_item])
resp = _redeem(token_client, organizer, clist, 'z3fsn8jyufm5kpk768q69gkbyr5f4h6w', {})
assert resp.status_code == 400
assert resp.data["status"] == "error"
assert resp.data["reason"] == "ambiguous"
with scopes_disabled():
assert not Checkin.objects.last()
@pytest.mark.django_db
def test_redeem_addon_if_match_and_revoked_force(token_client, organizer, clist, other_item, event, order):
with scopes_disabled():
event.revoked_secrets.create(position=order.positions.get(positionid=1), secret='revoked_secret')
clist.all_products = False
clist.addon_match = True
clist.save()
clist.limit_products.set([other_item])
p = order.positions.first().addons.all().first()
resp = _redeem(token_client, organizer, clist, 'revoked_secret', {'force': True})
assert resp.status_code == 201
assert resp.data["status"] == "ok"
with scopes_disabled():
ci = Checkin.objects.last()
assert ci.forced
assert ci.force_sent
assert ci.position == p
@pytest.mark.django_db
def test_redeem_multi_list(token_client, organizer, clist, clist_event2, order, order2):
with scopes_disabled():
p = order.positions.first()
p2 = order2.positions.first()
resp = _redeem(token_client, organizer, [clist, clist_event2], p.secret)
assert resp.status_code == 201
assert resp.data['position']['id'] == p.pk
assert resp.data['list'] == {'id': clist.pk, 'name': 'Default', 'event': 'dummy', 'subevent': None, 'include_pending': False}
resp = _redeem(token_client, organizer, [clist, clist_event2], p2.secret)
assert resp.status_code == 201
assert resp.data['position']['id'] == p2.pk
assert resp.data['list'] == {'id': clist_event2.pk, 'name': 'Event 2', 'event': 'dummy2', 'subevent': None, 'include_pending': False}
resp = _redeem(token_client, organizer, [clist], p2.secret)
assert resp.status_code == 404
@pytest.mark.django_db
def test_redeem_no_list(token_client, organizer, clist, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, [], p.secret)
assert resp.status_code == 400
assert resp.data == ['No check-in list passed.']
@pytest.mark.django_db
def test_redeem_conflicting_lists(token_client, organizer, clist, clist_all, event, order):
with scopes_disabled():
p = order.positions.first()
resp = _redeem(token_client, organizer, [clist_all, clist], p.secret)
assert resp.status_code == 400
assert resp.data == ['Selecting two check-in lists from the same event is unsupported.']
@pytest.mark.django_db
def test_search(token_client, organizer, event, clist, clist_all, item, other_item, order,
django_assert_max_num_queries):
with scopes_disabled():
p1 = dict(TEST_ORDERPOSITION1_RES)
p1["id"] = order.positions.get(positionid=1).pk
p1["item"] = item.pk
with django_assert_max_num_queries(17):
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?list={}&search=z3fsn8jyu'.format(organizer.slug, clist_all.pk))
assert resp.status_code == 200
assert [p1] == resp.data['results']
@pytest.mark.django_db
def test_search_no_list(token_client, organizer, event, clist, clist_all, item, other_item, order):
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?search=z3fsn8jyu'.format(organizer.slug))
assert resp.status_code == 400
assert resp.data == ['No check-in list passed.']
@pytest.mark.django_db
def test_search_conflicting_lists(token_client, organizer, event, clist, clist_all, item, other_item, order):
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?search=z3fsn8jyu&list={}&list={}'.format(organizer.slug, clist.pk, clist_all.pk))
assert resp.status_code == 400
assert resp.data == ['Selecting two check-in lists from the same event is unsupported.']
@pytest.mark.django_db
def test_search_multiple_lists(token_client, organizer, clist_all, clist_event2, order2, order):
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?list={}&list={}&search=dummy.test&ordering=attendee_name'.format(
organizer.slug, clist_all.pk, clist_event2.pk
)
)
assert resp.status_code == 200
with scopes_disabled():
assert resp.data['results'][0]['id'] == order2.positions.get(positionid=1).pk
assert resp.data['results'][1]['id'] == order.positions.get(positionid=2).pk
@pytest.mark.django_db
def test_without_permission(token_client, event, team, organizer, clist_all, order):
with scopes_disabled():
team.can_view_orders = False
team.can_change_orders = False
team.can_checkin_orders = False
team.save()
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?list={}&search=dummy.test&ordering=attendee_name'.format(organizer.slug, clist_all.pk))
assert resp.status_code == 403
assert resp.data == {
"detail": f"You requested lists that do not exist or that you do not have access to: {clist_all.pk}"
}
resp = _redeem(token_client, organizer, [clist_all], "foobar")
assert resp.status_code == 400
assert resp.data == {
"lists": [f'Invalid pk "{clist_all.pk}" - object does not exist.']
}
@pytest.mark.django_db
def test_without_permission_for_one_list(token_client, event, team, organizer, clist_all, clist_event2, order2, order):
with scopes_disabled():
team.all_events = False
team.save()
team.limit_events.set([event])
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?list={}&list={}&search=dummy.test&ordering=attendee_name'.format(
organizer.slug, clist_all.pk, clist_event2.pk
)
)
assert resp.status_code == 403
assert resp.data == {
"detail": f"You requested lists that do not exist or that you do not have access to: {clist_event2.pk}"
}
resp = _redeem(token_client, organizer, [clist_all, clist_event2], "foobar")
assert resp.status_code == 400
assert resp.data == {
"lists": [f'Invalid pk "{clist_event2.pk}" - object does not exist.']
}
@pytest.mark.django_db
def test_checkin_only_permission(token_client, event, team, organizer, clist_all, order):
with scopes_disabled():
p = order.positions.first()
clist_all.allow_multiple_entries = True
clist_all.save()
# With all permissions, I can submit very short search terms
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?list={}&search=du&ordering=attendee_name'.format(organizer.slug, clist_all.pk))
assert resp.data['count'] > 0
# With all permissions, I can request PDF data during checkin
resp = _redeem(token_client, organizer, [clist_all], p.secret, {}, '?pdf_data=true')
assert resp.status_code == 201
assert resp.data['position'].get('pdf_data')
with scopes_disabled():
team.can_view_orders = False
team.can_change_orders = False
team.can_checkin_orders = True
team.save()
# With limited permissions, I can not search with a 2-character query
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?list={}&search=du&ordering=attendee_name'.format(organizer.slug, clist_all.pk))
assert resp.status_code == 200
assert resp.data['count'] == 0
# With limited permissions, I can search with a 4-character query
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?list={}&search=dummy&ordering=attendee_name'.format(organizer.slug, clist_all.pk))
assert resp.status_code == 200
assert resp.data['count'] > 0
# With all permissions, I can not request PDF data during checkin
resp = _redeem(token_client, organizer, [clist_all], p.secret, {}, '?pdf_data=true')
assert resp.status_code == 201
assert not resp.data['position'].get('pdf_data')
@pytest.mark.django_db
def test_checkin_no_pdf_data(token_client, event, team, organizer, clist_all, order):
resp = token_client.get(
'/api/v1/organizers/{}/checkinrpc/search/?list={}&search=dummy&pdf_data=true'.format(organizer.slug, clist_all.pk))
assert not resp.data['results'][0].get('pdf_data')