Voucher bulk creation: More efficient implementation and async task

This commit is contained in:
Raphael Michel
2021-03-14 18:19:49 +01:00
parent 7eefd3dc59
commit 4a2ac110b3
8 changed files with 244 additions and 103 deletions

View File

@@ -3,22 +3,21 @@ from i18nfield.strings import LazyI18nString
from pretix.base.email import get_email_context
from pretix.base.i18n import language
from pretix.base.models import Event, User, Voucher
from pretix.base.models import Event, LogEntry, User, Voucher
from pretix.base.services.mail import mail
from pretix.base.services.tasks import TransactionAwareProfiledEventTask
from pretix.celery_app import app
@app.task(base=TransactionAwareProfiledEventTask, acks_late=True)
def vouchers_send(event: Event, vouchers: list, subject: str, message: str, recipients: list, user: int) -> None:
def vouchers_send(event: Event, vouchers: list, subject: str, message: str, recipients: list, user: int,
progress=None) -> None:
vouchers = list(Voucher.objects.filter(id__in=vouchers).order_by('id'))
user = User.objects.get(pk=user)
for r in recipients:
for ir, r in enumerate(recipients):
voucher_list = []
for i in range(r['number']):
voucher_list.append(vouchers.pop())
with language(event.settings.locale):
email_context = get_email_context(event=event, name=r.get('name') or '', voucher_list=[v.code for v in voucher_list])
email_context = get_email_context(event=event, name=r.get('name') or '',
voucher_list=[v.code for v in voucher_list])
mail(
r['email'],
subject,
@@ -27,14 +26,14 @@ def vouchers_send(event: Event, vouchers: list, subject: str, message: str, reci
event,
locale=event.settings.locale,
)
logs = []
for v in voucher_list:
if r.get('tag') and r.get('tag') != v.tag:
v.tag = r.get('tag')
if v.comment:
v.comment += '\n\n'
v.comment = gettext('The voucher has been sent to {recipient}.').format(recipient=r['email'])
v.save(update_fields=['tag', 'comment'])
v.log_action(
logs.append(v.log_action(
'pretix.voucher.sent',
user=user,
data={
@@ -42,5 +41,11 @@ def vouchers_send(event: Event, vouchers: list, subject: str, message: str, reci
'name': r.get('name'),
'subject': subject,
'message': message,
}
)
},
save=False
))
Voucher.objects.bulk_update(voucher_list, fields=['comment', 'tag'], batch_size=500)
LogEntry.objects.bulk_create(logs, batch_size=500)
if progress and ir % 50 == 0:
progress(ir / len(recipients))

View File

@@ -4,43 +4,25 @@ import celery.exceptions
from celery.result import AsyncResult
from django.conf import settings
from django.contrib import messages
from django.core.exceptions import ValidationError
from django.http import JsonResponse
from django.shortcuts import redirect, render
from django.test import RequestFactory
from django.utils.translation import gettext as _
from django.views.generic import FormView
from pretix.base.models import User
from pretix.base.services.tasks import ProfiledEventTask
from pretix.celery_app import app
logger = logging.getLogger('pretix.base.tasks')
class AsyncAction:
task = None
class AsyncMixin:
success_url = None
error_url = None
known_errortypes = []
def do(self, *args, **kwargs):
if not isinstance(self.task, app.Task):
raise TypeError('Method has no task attached')
try:
res = self.task.apply_async(args=args, kwargs=kwargs)
except ConnectionError:
# Task very likely not yet sent, due to redis restarting etc. Let's try once agan
res = self.task.apply_async(args=args, kwargs=kwargs)
if 'ajax' in self.request.GET or 'ajax' in self.request.POST:
data = self._return_ajax_result(res)
data['check_url'] = self.get_check_url(res.id, True)
return JsonResponse(data)
else:
if res.ready():
if res.successful() and not isinstance(res.info, Exception):
return self.success(res.info)
else:
return self.error(res.info)
return redirect(self.get_check_url(res.id, False))
def get_success_url(self, value):
return self.success_url
@@ -50,11 +32,6 @@ class AsyncAction:
def get_check_url(self, task_id, ajax):
return self.request.path + '?async_id=%s' % task_id + ('&ajax=1' if ajax else '')
def get(self, request, *args, **kwargs):
if 'async_id' in request.GET and settings.HAS_CELERY:
return self.get_result(request)
return self.http_method_not_allowed(request)
def _ajax_response_data(self):
return {}
@@ -86,7 +63,7 @@ class AsyncAction:
if smes:
messages.success(self.request, smes)
# TODO: Do not store message if the ajax client states that it will not redirect
# but handle the mssage itself
# but handle the message itself
data.update({
'redirect': self.get_success_url(res.info),
'success': True,
@@ -95,7 +72,7 @@ class AsyncAction:
else:
messages.error(self.request, self.get_error_message(res.info))
# TODO: Do not store message if the ajax client states that it will not redirect
# but handle the mssage itself
# but handle the message itself
data.update({
'redirect': self.get_error_url(),
'success': False,
@@ -159,3 +136,124 @@ class AsyncAction:
def get_success_message(self, value):
return _('The task has been completed.')
class AsyncAction(AsyncMixin):
task = None
def do(self, *args, **kwargs):
if not isinstance(self.task, app.Task):
raise TypeError('Method has no task attached')
try:
res = self.task.apply_async(args=args, kwargs=kwargs)
except ConnectionError:
# Task very likely not yet sent, due to redis restarting etc. Let's try once again
res = self.task.apply_async(args=args, kwargs=kwargs)
if 'ajax' in self.request.GET or 'ajax' in self.request.POST:
data = self._return_ajax_result(res)
data['check_url'] = self.get_check_url(res.id, True)
return JsonResponse(data)
else:
if res.ready():
if res.successful() and not isinstance(res.info, Exception):
return self.success(res.info)
else:
return self.error(res.info)
return redirect(self.get_check_url(res.id, False))
def get(self, request, *args, **kwargs):
if 'async_id' in request.GET and settings.HAS_CELERY:
return self.get_result(request)
return self.http_method_not_allowed(request)
class AsyncFormView(AsyncMixin, FormView):
"""
FormView variant in which instead of ``form_valid``, an ``async_form_valid``
is executed in a celery task. Note that this places some severe limitations
on the form and the view, e.g. neither ``get_form*`` nor the form itself
may depend on the request object unless specifically supported by this class.
Also, all form keyword arguments except ``instance`` need to be serializable.
"""
known_errortypes = ['ValidationError']
def __init_subclass__(cls):
def async_execute(self, request_path, form_kwargs, organizer=None, event=None, user=None):
view_instance = cls()
view_instance.request = RequestFactory().post(request_path)
if organizer:
view_instance.request.event = event
if organizer:
view_instance.request.organizer = organizer
if user:
view_instance.request.user = User.objects.get(pk=user)
form_class = view_instance.get_form_class()
if form_kwargs.get('instance'):
cls.model.objects.get(pk=form_kwargs['instance'])
form_kwargs = view_instance.get_async_form_kwargs(form_kwargs, organizer, event)
form = form_class(**form_kwargs)
return view_instance.async_form_valid(self, form)
cls.async_execute = app.task(
base=ProfiledEventTask,
bind=True,
name=cls.__module__ + '.' + cls.__name__ + '.async_execute',
throws=(ValidationError,)
)(async_execute)
def async_form_valid(self, task, form):
pass
def get_async_form_kwargs(self, form_kwargs, organizer=None, event=None):
return form_kwargs
def get(self, request, *args, **kwargs):
if 'async_id' in request.GET and settings.HAS_CELERY:
return self.get_result(request)
return super().get(request, *args, **kwargs)
def form_valid(self, form):
if form.files:
raise TypeError('File upload currently not supported in AsyncFormView')
form_kwargs = {
k: v for k, v in self.get_form_kwargs().items()
}
if form_kwargs.get('instance'):
if form_kwargs['instance'].pk:
form_kwargs['instance'] = form_kwargs['instance'].pk
else:
form_kwargs['instance'] = None
form_kwargs.setdefault('data', {})
kwargs = {
'request_path': self.request.path,
'form_kwargs': form_kwargs,
}
if hasattr(self.request, 'organizer'):
kwargs['organizer'] = self.request.organizer.pk
if self.request.user.is_authenticated:
kwargs['user'] = self.request.user.pk
if hasattr(self.request, 'event'):
kwargs['event'] = self.request.event.pk
try:
res = type(self).async_execute.apply_async(kwargs=kwargs)
except ConnectionError:
# Task very likely not yet sent, due to redis restarting etc. Let's try once again
res = type(self).async_execute.apply_async(kwargs=kwargs)
if 'ajax' in self.request.GET or 'ajax' in self.request.POST:
data = self._return_ajax_result(res)
data['check_url'] = self.get_check_url(res.id, True)
return JsonResponse(data)
else:
if res.ready():
if res.successful() and not isinstance(res.info, Exception):
return self.success(res.info)
else:
return self.error(res.info)
return redirect(self.get_check_url(res.id, False))

View File

@@ -5,7 +5,7 @@ from io import StringIO
from django import forms
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.core.validators import EmailValidator
from django.db.models.functions import Lower
from django.db.models.functions import Upper
from django.urls import reverse
from django.utils.translation import gettext_lazy as _, pgettext_lazy
from django_scopes.forms import SafeModelChoiceField
@@ -346,8 +346,8 @@ class VoucherBulkForm(VoucherForm):
data = super().clean()
vouchers = self.instance.event.vouchers.annotate(
code_lower=Lower('code')
).filter(code_lower__in=[c.lower() for c in data['codes']])
code_upper=Upper('code')
).filter(code_upper__in=[c.upper() for c in data['codes']])
if vouchers.exists():
raise ValidationError(_('A voucher with one of these codes already exists.'))
@@ -377,26 +377,5 @@ class VoucherBulkForm(VoucherForm):
return data
def save(self, event, *args, **kwargs):
objs = []
for code in self.cleaned_data['codes']:
obj = modelcopy(self.instance)
obj.event = event
obj.code = code
try:
obj.seat = self.cleaned_data['seats'].pop()
obj.item = obj.seat.product
except IndexError:
pass
data = dict(self.cleaned_data)
data['code'] = code
data['bulk'] = True
del data['codes']
objs.append(obj)
Voucher.objects.bulk_create(objs, batch_size=200)
objs = []
for v in event.vouchers.filter(code__in=self.cleaned_data['codes']):
# We need to query them again as bulk_create does not fill in .pk values on databases
# other than PostgreSQL
objs.append(v)
return objs
def post_bulk_save(self, objs):
pass

View File

@@ -154,6 +154,11 @@ This signal allows you to replace the form class that is used for modifying vouc
You will receive the default form class (or the class set by a previous plugin) in the
``cls`` argument so that you can inherit from it.
Note that this is also called for the voucher bulk creation form, which is executed in
an asynchronous context. For the bulk creation form, ``save()`` is not called. Instead,
you can implement ``post_bulk_save(saved_vouchers)`` which may be called multiple times
for every batch persisted to the database.
As with all plugin signals, the ``sender`` keyword argument will contain the event.
"""

View File

@@ -5,7 +5,7 @@
{% block title %}{% trans "Voucher" %}{% endblock %}
{% block inside %}
<h1>{% trans "Create multiple vouchers" %}</h1>
<form action="" method="post" class="form-horizontal">
<form action="" method="post" class="form-horizontal" data-asynctask>
{% csrf_token %}
{% bootstrap_form_errors form %}
<fieldset>

View File

@@ -3,7 +3,8 @@ import io
from defusedcsv import csv
from django.conf import settings
from django.contrib import messages
from django.db import transaction
from django.core.exceptions import ValidationError
from django.db import connection, transaction
from django.db.models import Sum
from django.http import (
Http404, HttpResponse, HttpResponseBadRequest, HttpResponseRedirect,
@@ -21,7 +22,9 @@ from django.views.generic import (
from pretix.base.models import CartPosition, LogEntry, OrderPosition, Voucher
from pretix.base.models.vouchers import _generate_random_code
from pretix.base.services.locking import NoLockManager
from pretix.base.services.vouchers import vouchers_send
from pretix.base.views.tasks import AsyncFormView
from pretix.control.forms.filter import VoucherFilterForm, VoucherTagFilterForm
from pretix.control.forms.vouchers import VoucherBulkForm, VoucherForm
from pretix.control.permissions import EventPermissionRequiredMixin
@@ -287,13 +290,19 @@ class VoucherGo(EventPermissionRequiredMixin, View):
return redirect('control:event.vouchers', event=request.event.slug, organizer=request.event.organizer.slug)
class VoucherBulkCreate(EventPermissionRequiredMixin, CreateView):
class VoucherBulkCreate(EventPermissionRequiredMixin, AsyncFormView):
model = Voucher
template_name = 'pretixcontrol/vouchers/bulk.html'
permission = 'can_change_vouchers'
context_object_name = 'voucher'
def get_success_url(self) -> str:
def get_success_url(self, value) -> str:
return reverse('control:event.vouchers', kwargs={
'organizer': self.request.event.organizer.slug,
'event': self.request.event.slug,
})
def get_error_url(self):
return reverse('control:event.vouchers', kwargs={
'organizer': self.request.event.organizer.slug,
'event': self.request.event.slug,
@@ -316,34 +325,84 @@ class VoucherBulkCreate(EventPermissionRequiredMixin, CreateView):
i.redeemed = 0
kwargs['instance'] = i
else:
kwargs['instance'] = Voucher(event=self.request.event)
kwargs['instance'] = Voucher(event=self.request.event, code=None)
return kwargs
@transaction.atomic
def form_valid(self, form):
log_entries = []
objs = form.save(self.request.event)
def get_async_form_kwargs(self, form_kwargs, organizer=None, event=None):
if not form_kwargs.get('instance'):
form_kwargs['instance'] = Voucher(event=self.request.event, code=None)
return form_kwargs
def async_form_valid(self, task, form):
lockfn = NoLockManager
if form.data.get('block_quota'):
lockfn = self.request.event.lock
batch_size = 500
total_num = 1 # will be set later
def set_progress(percent):
if not task.request.called_directly:
task.update_state(
state='PROGRESS',
meta={'value': percent}
)
def process_batch(batch_vouchers, voucherids):
Voucher.objects.bulk_create(batch_vouchers)
if not connection.features.can_return_rows_from_bulk_insert:
batch_vouchers = list(self.request.event.vouchers.filter(code__in=[v.code for v in batch_vouchers]))
log_entries = []
for v in batch_vouchers:
voucherids.append(v.pk)
data = dict(form.cleaned_data)
data['code'] = code
data['bulk'] = True
del data['codes']
log_entries.append(
v.log_action('pretix.voucher.added', data=data, user=self.request.user, save=False)
)
LogEntry.objects.bulk_create(log_entries)
form.post_bulk_save(batch_vouchers)
batch_vouchers.clear()
set_progress(len(voucherids) / total_num * (50. if form.cleaned_data['send'] else 100.))
voucherids = []
for v in objs:
log_entries.append(
v.log_action('pretix.voucher.added', data=form.cleaned_data, user=self.request.user, save=False)
)
voucherids.append(v.pk)
LogEntry.objects.bulk_create(log_entries, batch_size=200)
with lockfn(), transaction.atomic():
if not form.is_valid():
raise ValidationError(form.errors)
total_num = len(form.cleaned_data['codes'])
batch_vouchers = []
for code in form.cleaned_data['codes']:
if len(batch_vouchers) > batch_size:
process_batch(batch_vouchers, voucherids)
obj = modelcopy(form.instance, code=None)
obj.event = self.request.event
obj.code = code
try:
obj.seat = form.cleaned_data['seats'].pop()
obj.item = obj.seat.product
except IndexError:
pass
batch_vouchers.append(obj)
process_batch(batch_vouchers, voucherids)
if form.cleaned_data['send']:
vouchers_send.apply_async(kwargs={
'event': self.request.event.pk,
'vouchers': voucherids,
'subject': form.cleaned_data['send_subject'],
'message': form.cleaned_data['send_message'],
'recipients': [r._asdict() for r in form.cleaned_data['send_recipients']],
'user': self.request.user.pk,
})
messages.success(self.request, _('The new vouchers have been created and will be sent out shortly.'))
else:
messages.success(self.request, _('The new vouchers have been created.'))
return HttpResponseRedirect(self.get_success_url())
vouchers_send(
event=self.request.event,
vouchers=voucherids,
subject=form.cleaned_data['send_subject'],
message=form.cleaned_data['send_message'],
recipients=[r._asdict() for r in form.cleaned_data['send_recipients']],
user=self.request.user.pk,
progress=lambda p: set_progress(50. + p * 50.)
)
def get_success_message(self, value):
return _('The new vouchers have been created.')
def get_form_class(self):
form_class = VoucherBulkForm
@@ -357,11 +416,6 @@ class VoucherBulkCreate(EventPermissionRequiredMixin, CreateView):
ctx['code_length'] = settings.ENTROPY['voucher_code']
return ctx
def post(self, request, *args, **kwargs):
# TODO: Transform this into an asynchronous call?
with request.event.lock():
return super().post(request, *args, **kwargs)
class VoucherRNG(EventPermissionRequiredMixin, View):
permission = 'can_change_vouchers'

View File

@@ -12,8 +12,8 @@ class Thumbnail(models.Model):
unique_together = (('source', 'size'),)
def modelcopy(obj: models.Model):
n = obj.__class__()
def modelcopy(obj: models.Model, **kwargs):
n = obj.__class__(**kwargs)
for f in obj._meta.fields:
val = getattr(obj, f.name)
if isinstance(val, models.Model):

View File

@@ -653,7 +653,7 @@ LOGGING = {
'django.db.backends': {
'handlers': ['file', 'console'],
'level': 'INFO', # Do not output all the queries
'propagate': True,
'propagate': False,
},
'asyncio': {
'handlers': ['file', 'console'],