From 4a2ac110b33d629d56bc04f2a367fc86dc5cbbab Mon Sep 17 00:00:00 2001 From: Raphael Michel Date: Sun, 14 Mar 2021 18:19:49 +0100 Subject: [PATCH] Voucher bulk creation: More efficient implementation and async task --- src/pretix/base/services/vouchers.py | 27 +-- src/pretix/base/views/tasks.py | 160 ++++++++++++++---- src/pretix/control/forms/vouchers.py | 31 +--- src/pretix/control/signals.py | 5 + .../pretixcontrol/vouchers/bulk.html | 2 +- src/pretix/control/views/vouchers.py | 116 +++++++++---- src/pretix/helpers/models.py | 4 +- src/pretix/settings.py | 2 +- 8 files changed, 244 insertions(+), 103 deletions(-) diff --git a/src/pretix/base/services/vouchers.py b/src/pretix/base/services/vouchers.py index d39bdc9cb1..4782d42286 100644 --- a/src/pretix/base/services/vouchers.py +++ b/src/pretix/base/services/vouchers.py @@ -3,22 +3,21 @@ from i18nfield.strings import LazyI18nString from pretix.base.email import get_email_context from pretix.base.i18n import language -from pretix.base.models import Event, User, Voucher +from pretix.base.models import Event, LogEntry, User, Voucher from pretix.base.services.mail import mail -from pretix.base.services.tasks import TransactionAwareProfiledEventTask -from pretix.celery_app import app -@app.task(base=TransactionAwareProfiledEventTask, acks_late=True) -def vouchers_send(event: Event, vouchers: list, subject: str, message: str, recipients: list, user: int) -> None: +def vouchers_send(event: Event, vouchers: list, subject: str, message: str, recipients: list, user: int, + progress=None) -> None: vouchers = list(Voucher.objects.filter(id__in=vouchers).order_by('id')) user = User.objects.get(pk=user) - for r in recipients: + for ir, r in enumerate(recipients): voucher_list = [] for i in range(r['number']): voucher_list.append(vouchers.pop()) with language(event.settings.locale): - email_context = get_email_context(event=event, name=r.get('name') or '', voucher_list=[v.code for v in voucher_list]) + email_context = get_email_context(event=event, name=r.get('name') or '', + voucher_list=[v.code for v in voucher_list]) mail( r['email'], subject, @@ -27,14 +26,14 @@ def vouchers_send(event: Event, vouchers: list, subject: str, message: str, reci event, locale=event.settings.locale, ) + logs = [] for v in voucher_list: if r.get('tag') and r.get('tag') != v.tag: v.tag = r.get('tag') if v.comment: v.comment += '\n\n' v.comment = gettext('The voucher has been sent to {recipient}.').format(recipient=r['email']) - v.save(update_fields=['tag', 'comment']) - v.log_action( + logs.append(v.log_action( 'pretix.voucher.sent', user=user, data={ @@ -42,5 +41,11 @@ def vouchers_send(event: Event, vouchers: list, subject: str, message: str, reci 'name': r.get('name'), 'subject': subject, 'message': message, - } - ) + }, + save=False + )) + Voucher.objects.bulk_update(voucher_list, fields=['comment', 'tag'], batch_size=500) + LogEntry.objects.bulk_create(logs, batch_size=500) + + if progress and ir % 50 == 0: + progress(ir / len(recipients)) diff --git a/src/pretix/base/views/tasks.py b/src/pretix/base/views/tasks.py index 76f4678090..2cf777ecfa 100644 --- a/src/pretix/base/views/tasks.py +++ b/src/pretix/base/views/tasks.py @@ -4,43 +4,25 @@ import celery.exceptions from celery.result import AsyncResult from django.conf import settings from django.contrib import messages +from django.core.exceptions import ValidationError from django.http import JsonResponse from django.shortcuts import redirect, render +from django.test import RequestFactory from django.utils.translation import gettext as _ +from django.views.generic import FormView +from pretix.base.models import User +from pretix.base.services.tasks import ProfiledEventTask from pretix.celery_app import app logger = logging.getLogger('pretix.base.tasks') -class AsyncAction: - task = None +class AsyncMixin: success_url = None error_url = None known_errortypes = [] - def do(self, *args, **kwargs): - if not isinstance(self.task, app.Task): - raise TypeError('Method has no task attached') - - try: - res = self.task.apply_async(args=args, kwargs=kwargs) - except ConnectionError: - # Task very likely not yet sent, due to redis restarting etc. Let's try once agan - res = self.task.apply_async(args=args, kwargs=kwargs) - - if 'ajax' in self.request.GET or 'ajax' in self.request.POST: - data = self._return_ajax_result(res) - data['check_url'] = self.get_check_url(res.id, True) - return JsonResponse(data) - else: - if res.ready(): - if res.successful() and not isinstance(res.info, Exception): - return self.success(res.info) - else: - return self.error(res.info) - return redirect(self.get_check_url(res.id, False)) - def get_success_url(self, value): return self.success_url @@ -50,11 +32,6 @@ class AsyncAction: def get_check_url(self, task_id, ajax): return self.request.path + '?async_id=%s' % task_id + ('&ajax=1' if ajax else '') - def get(self, request, *args, **kwargs): - if 'async_id' in request.GET and settings.HAS_CELERY: - return self.get_result(request) - return self.http_method_not_allowed(request) - def _ajax_response_data(self): return {} @@ -86,7 +63,7 @@ class AsyncAction: if smes: messages.success(self.request, smes) # TODO: Do not store message if the ajax client states that it will not redirect - # but handle the mssage itself + # but handle the message itself data.update({ 'redirect': self.get_success_url(res.info), 'success': True, @@ -95,7 +72,7 @@ class AsyncAction: else: messages.error(self.request, self.get_error_message(res.info)) # TODO: Do not store message if the ajax client states that it will not redirect - # but handle the mssage itself + # but handle the message itself data.update({ 'redirect': self.get_error_url(), 'success': False, @@ -159,3 +136,124 @@ class AsyncAction: def get_success_message(self, value): return _('The task has been completed.') + + +class AsyncAction(AsyncMixin): + task = None + + def do(self, *args, **kwargs): + if not isinstance(self.task, app.Task): + raise TypeError('Method has no task attached') + + try: + res = self.task.apply_async(args=args, kwargs=kwargs) + except ConnectionError: + # Task very likely not yet sent, due to redis restarting etc. Let's try once again + res = self.task.apply_async(args=args, kwargs=kwargs) + + if 'ajax' in self.request.GET or 'ajax' in self.request.POST: + data = self._return_ajax_result(res) + data['check_url'] = self.get_check_url(res.id, True) + return JsonResponse(data) + else: + if res.ready(): + if res.successful() and not isinstance(res.info, Exception): + return self.success(res.info) + else: + return self.error(res.info) + return redirect(self.get_check_url(res.id, False)) + + def get(self, request, *args, **kwargs): + if 'async_id' in request.GET and settings.HAS_CELERY: + return self.get_result(request) + return self.http_method_not_allowed(request) + + +class AsyncFormView(AsyncMixin, FormView): + """ + FormView variant in which instead of ``form_valid``, an ``async_form_valid`` + is executed in a celery task. Note that this places some severe limitations + on the form and the view, e.g. neither ``get_form*`` nor the form itself + may depend on the request object unless specifically supported by this class. + Also, all form keyword arguments except ``instance`` need to be serializable. + """ + known_errortypes = ['ValidationError'] + + def __init_subclass__(cls): + def async_execute(self, request_path, form_kwargs, organizer=None, event=None, user=None): + view_instance = cls() + view_instance.request = RequestFactory().post(request_path) + if organizer: + view_instance.request.event = event + if organizer: + view_instance.request.organizer = organizer + if user: + view_instance.request.user = User.objects.get(pk=user) + + form_class = view_instance.get_form_class() + if form_kwargs.get('instance'): + cls.model.objects.get(pk=form_kwargs['instance']) + + form_kwargs = view_instance.get_async_form_kwargs(form_kwargs, organizer, event) + + form = form_class(**form_kwargs) + return view_instance.async_form_valid(self, form) + + cls.async_execute = app.task( + base=ProfiledEventTask, + bind=True, + name=cls.__module__ + '.' + cls.__name__ + '.async_execute', + throws=(ValidationError,) + )(async_execute) + + def async_form_valid(self, task, form): + pass + + def get_async_form_kwargs(self, form_kwargs, organizer=None, event=None): + return form_kwargs + + def get(self, request, *args, **kwargs): + if 'async_id' in request.GET and settings.HAS_CELERY: + return self.get_result(request) + return super().get(request, *args, **kwargs) + + def form_valid(self, form): + if form.files: + raise TypeError('File upload currently not supported in AsyncFormView') + form_kwargs = { + k: v for k, v in self.get_form_kwargs().items() + } + if form_kwargs.get('instance'): + if form_kwargs['instance'].pk: + form_kwargs['instance'] = form_kwargs['instance'].pk + else: + form_kwargs['instance'] = None + form_kwargs.setdefault('data', {}) + kwargs = { + 'request_path': self.request.path, + 'form_kwargs': form_kwargs, + } + if hasattr(self.request, 'organizer'): + kwargs['organizer'] = self.request.organizer.pk + if self.request.user.is_authenticated: + kwargs['user'] = self.request.user.pk + if hasattr(self.request, 'event'): + kwargs['event'] = self.request.event.pk + + try: + res = type(self).async_execute.apply_async(kwargs=kwargs) + except ConnectionError: + # Task very likely not yet sent, due to redis restarting etc. Let's try once again + res = type(self).async_execute.apply_async(kwargs=kwargs) + + if 'ajax' in self.request.GET or 'ajax' in self.request.POST: + data = self._return_ajax_result(res) + data['check_url'] = self.get_check_url(res.id, True) + return JsonResponse(data) + else: + if res.ready(): + if res.successful() and not isinstance(res.info, Exception): + return self.success(res.info) + else: + return self.error(res.info) + return redirect(self.get_check_url(res.id, False)) diff --git a/src/pretix/control/forms/vouchers.py b/src/pretix/control/forms/vouchers.py index 175226d5e9..9952271007 100644 --- a/src/pretix/control/forms/vouchers.py +++ b/src/pretix/control/forms/vouchers.py @@ -5,7 +5,7 @@ from io import StringIO from django import forms from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.validators import EmailValidator -from django.db.models.functions import Lower +from django.db.models.functions import Upper from django.urls import reverse from django.utils.translation import gettext_lazy as _, pgettext_lazy from django_scopes.forms import SafeModelChoiceField @@ -346,8 +346,8 @@ class VoucherBulkForm(VoucherForm): data = super().clean() vouchers = self.instance.event.vouchers.annotate( - code_lower=Lower('code') - ).filter(code_lower__in=[c.lower() for c in data['codes']]) + code_upper=Upper('code') + ).filter(code_upper__in=[c.upper() for c in data['codes']]) if vouchers.exists(): raise ValidationError(_('A voucher with one of these codes already exists.')) @@ -377,26 +377,5 @@ class VoucherBulkForm(VoucherForm): return data - def save(self, event, *args, **kwargs): - objs = [] - for code in self.cleaned_data['codes']: - obj = modelcopy(self.instance) - obj.event = event - obj.code = code - try: - obj.seat = self.cleaned_data['seats'].pop() - obj.item = obj.seat.product - except IndexError: - pass - data = dict(self.cleaned_data) - data['code'] = code - data['bulk'] = True - del data['codes'] - objs.append(obj) - Voucher.objects.bulk_create(objs, batch_size=200) - objs = [] - for v in event.vouchers.filter(code__in=self.cleaned_data['codes']): - # We need to query them again as bulk_create does not fill in .pk values on databases - # other than PostgreSQL - objs.append(v) - return objs + def post_bulk_save(self, objs): + pass diff --git a/src/pretix/control/signals.py b/src/pretix/control/signals.py index 7dddb86d09..090b0d189b 100644 --- a/src/pretix/control/signals.py +++ b/src/pretix/control/signals.py @@ -154,6 +154,11 @@ This signal allows you to replace the form class that is used for modifying vouc You will receive the default form class (or the class set by a previous plugin) in the ``cls`` argument so that you can inherit from it. +Note that this is also called for the voucher bulk creation form, which is executed in +an asynchronous context. For the bulk creation form, ``save()`` is not called. Instead, +you can implement ``post_bulk_save(saved_vouchers)`` which may be called multiple times +for every batch persisted to the database. + As with all plugin signals, the ``sender`` keyword argument will contain the event. """ diff --git a/src/pretix/control/templates/pretixcontrol/vouchers/bulk.html b/src/pretix/control/templates/pretixcontrol/vouchers/bulk.html index e562c1e51a..6ccbaf0c25 100644 --- a/src/pretix/control/templates/pretixcontrol/vouchers/bulk.html +++ b/src/pretix/control/templates/pretixcontrol/vouchers/bulk.html @@ -5,7 +5,7 @@ {% block title %}{% trans "Voucher" %}{% endblock %} {% block inside %}

{% trans "Create multiple vouchers" %}

-
+ {% csrf_token %} {% bootstrap_form_errors form %}
diff --git a/src/pretix/control/views/vouchers.py b/src/pretix/control/views/vouchers.py index 06e1f5c8a1..07b6e0dafe 100644 --- a/src/pretix/control/views/vouchers.py +++ b/src/pretix/control/views/vouchers.py @@ -3,7 +3,8 @@ import io from defusedcsv import csv from django.conf import settings from django.contrib import messages -from django.db import transaction +from django.core.exceptions import ValidationError +from django.db import connection, transaction from django.db.models import Sum from django.http import ( Http404, HttpResponse, HttpResponseBadRequest, HttpResponseRedirect, @@ -21,7 +22,9 @@ from django.views.generic import ( from pretix.base.models import CartPosition, LogEntry, OrderPosition, Voucher from pretix.base.models.vouchers import _generate_random_code +from pretix.base.services.locking import NoLockManager from pretix.base.services.vouchers import vouchers_send +from pretix.base.views.tasks import AsyncFormView from pretix.control.forms.filter import VoucherFilterForm, VoucherTagFilterForm from pretix.control.forms.vouchers import VoucherBulkForm, VoucherForm from pretix.control.permissions import EventPermissionRequiredMixin @@ -287,13 +290,19 @@ class VoucherGo(EventPermissionRequiredMixin, View): return redirect('control:event.vouchers', event=request.event.slug, organizer=request.event.organizer.slug) -class VoucherBulkCreate(EventPermissionRequiredMixin, CreateView): +class VoucherBulkCreate(EventPermissionRequiredMixin, AsyncFormView): model = Voucher template_name = 'pretixcontrol/vouchers/bulk.html' permission = 'can_change_vouchers' context_object_name = 'voucher' - def get_success_url(self) -> str: + def get_success_url(self, value) -> str: + return reverse('control:event.vouchers', kwargs={ + 'organizer': self.request.event.organizer.slug, + 'event': self.request.event.slug, + }) + + def get_error_url(self): return reverse('control:event.vouchers', kwargs={ 'organizer': self.request.event.organizer.slug, 'event': self.request.event.slug, @@ -316,34 +325,84 @@ class VoucherBulkCreate(EventPermissionRequiredMixin, CreateView): i.redeemed = 0 kwargs['instance'] = i else: - kwargs['instance'] = Voucher(event=self.request.event) + kwargs['instance'] = Voucher(event=self.request.event, code=None) return kwargs - @transaction.atomic - def form_valid(self, form): - log_entries = [] - objs = form.save(self.request.event) + def get_async_form_kwargs(self, form_kwargs, organizer=None, event=None): + if not form_kwargs.get('instance'): + form_kwargs['instance'] = Voucher(event=self.request.event, code=None) + return form_kwargs + + def async_form_valid(self, task, form): + lockfn = NoLockManager + if form.data.get('block_quota'): + lockfn = self.request.event.lock + batch_size = 500 + total_num = 1 # will be set later + + def set_progress(percent): + if not task.request.called_directly: + task.update_state( + state='PROGRESS', + meta={'value': percent} + ) + + def process_batch(batch_vouchers, voucherids): + Voucher.objects.bulk_create(batch_vouchers) + if not connection.features.can_return_rows_from_bulk_insert: + batch_vouchers = list(self.request.event.vouchers.filter(code__in=[v.code for v in batch_vouchers])) + + log_entries = [] + for v in batch_vouchers: + voucherids.append(v.pk) + data = dict(form.cleaned_data) + data['code'] = code + data['bulk'] = True + del data['codes'] + log_entries.append( + v.log_action('pretix.voucher.added', data=data, user=self.request.user, save=False) + ) + LogEntry.objects.bulk_create(log_entries) + form.post_bulk_save(batch_vouchers) + batch_vouchers.clear() + set_progress(len(voucherids) / total_num * (50. if form.cleaned_data['send'] else 100.)) + voucherids = [] - for v in objs: - log_entries.append( - v.log_action('pretix.voucher.added', data=form.cleaned_data, user=self.request.user, save=False) - ) - voucherids.append(v.pk) - LogEntry.objects.bulk_create(log_entries, batch_size=200) + with lockfn(), transaction.atomic(): + if not form.is_valid(): + raise ValidationError(form.errors) + total_num = len(form.cleaned_data['codes']) + + batch_vouchers = [] + for code in form.cleaned_data['codes']: + if len(batch_vouchers) > batch_size: + process_batch(batch_vouchers, voucherids) + + obj = modelcopy(form.instance, code=None) + obj.event = self.request.event + obj.code = code + try: + obj.seat = form.cleaned_data['seats'].pop() + obj.item = obj.seat.product + except IndexError: + pass + batch_vouchers.append(obj) + + process_batch(batch_vouchers, voucherids) if form.cleaned_data['send']: - vouchers_send.apply_async(kwargs={ - 'event': self.request.event.pk, - 'vouchers': voucherids, - 'subject': form.cleaned_data['send_subject'], - 'message': form.cleaned_data['send_message'], - 'recipients': [r._asdict() for r in form.cleaned_data['send_recipients']], - 'user': self.request.user.pk, - }) - messages.success(self.request, _('The new vouchers have been created and will be sent out shortly.')) - else: - messages.success(self.request, _('The new vouchers have been created.')) - return HttpResponseRedirect(self.get_success_url()) + vouchers_send( + event=self.request.event, + vouchers=voucherids, + subject=form.cleaned_data['send_subject'], + message=form.cleaned_data['send_message'], + recipients=[r._asdict() for r in form.cleaned_data['send_recipients']], + user=self.request.user.pk, + progress=lambda p: set_progress(50. + p * 50.) + ) + + def get_success_message(self, value): + return _('The new vouchers have been created.') def get_form_class(self): form_class = VoucherBulkForm @@ -357,11 +416,6 @@ class VoucherBulkCreate(EventPermissionRequiredMixin, CreateView): ctx['code_length'] = settings.ENTROPY['voucher_code'] return ctx - def post(self, request, *args, **kwargs): - # TODO: Transform this into an asynchronous call? - with request.event.lock(): - return super().post(request, *args, **kwargs) - class VoucherRNG(EventPermissionRequiredMixin, View): permission = 'can_change_vouchers' diff --git a/src/pretix/helpers/models.py b/src/pretix/helpers/models.py index f40d3a0059..2451a71d08 100644 --- a/src/pretix/helpers/models.py +++ b/src/pretix/helpers/models.py @@ -12,8 +12,8 @@ class Thumbnail(models.Model): unique_together = (('source', 'size'),) -def modelcopy(obj: models.Model): - n = obj.__class__() +def modelcopy(obj: models.Model, **kwargs): + n = obj.__class__(**kwargs) for f in obj._meta.fields: val = getattr(obj, f.name) if isinstance(val, models.Model): diff --git a/src/pretix/settings.py b/src/pretix/settings.py index f53d825026..8ba5aa9e70 100644 --- a/src/pretix/settings.py +++ b/src/pretix/settings.py @@ -653,7 +653,7 @@ LOGGING = { 'django.db.backends': { 'handlers': ['file', 'console'], 'level': 'INFO', # Do not output all the queries - 'propagate': True, + 'propagate': False, }, 'asyncio': { 'handlers': ['file', 'console'],