mirror of
https://github.com/pretix/pretix.git
synced 2026-05-03 14:54:04 +00:00
403 lines
17 KiB
Python
403 lines
17 KiB
Python
import csv
|
||
from collections import namedtuple
|
||
from io import StringIO
|
||
|
||
from django import forms
|
||
from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
||
from django.core.validators import EmailValidator
|
||
from django.db.models.functions import Lower
|
||
from django.urls import reverse
|
||
from django.utils.translation import gettext_lazy as _, pgettext_lazy
|
||
from django_scopes.forms import SafeModelChoiceField
|
||
|
||
from pretix.base.email import get_available_placeholders
|
||
from pretix.base.forms import I18nModelForm, PlaceholderValidator
|
||
from pretix.base.models import Item, Voucher
|
||
from pretix.control.forms import SplitDateTimeField, SplitDateTimePickerWidget
|
||
from pretix.control.forms.widgets import Select2, Select2ItemVarQuota
|
||
from pretix.control.signals import voucher_form_validation
|
||
from pretix.helpers.models import modelcopy
|
||
|
||
|
||
class FakeChoiceField(forms.ChoiceField):
|
||
def valid_value(self, value):
|
||
return True
|
||
|
||
|
||
class VoucherForm(I18nModelForm):
|
||
itemvar = FakeChoiceField(
|
||
label=_("Product"),
|
||
help_text=_(
|
||
"This product is added to the user's cart if the voucher is redeemed."
|
||
),
|
||
required=True
|
||
)
|
||
|
||
class Meta:
|
||
model = Voucher
|
||
localized_fields = '__all__'
|
||
fields = [
|
||
'code', 'valid_until', 'block_quota', 'allow_ignore_quota', 'value', 'tag',
|
||
'comment', 'max_usages', 'price_mode', 'subevent', 'show_hidden_items', 'budget'
|
||
]
|
||
field_classes = {
|
||
'valid_until': SplitDateTimeField,
|
||
'subevent': SafeModelChoiceField,
|
||
}
|
||
widgets = {
|
||
'valid_until': SplitDateTimePickerWidget(),
|
||
}
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
instance = kwargs.get('instance')
|
||
initial = kwargs.get('initial')
|
||
if instance:
|
||
self.initial_instance_data = modelcopy(instance)
|
||
try:
|
||
if instance.variation:
|
||
initial['itemvar'] = '%d-%d' % (instance.item.pk, instance.variation.pk)
|
||
elif instance.item:
|
||
initial['itemvar'] = str(instance.item.pk)
|
||
elif instance.quota:
|
||
initial['itemvar'] = 'q-%d' % instance.quota.pk
|
||
except Item.DoesNotExist:
|
||
pass
|
||
else:
|
||
self.initial_instance_data = None
|
||
super().__init__(*args, **kwargs)
|
||
|
||
if instance.event.has_subevents:
|
||
self.fields['subevent'].queryset = instance.event.subevents.all()
|
||
self.fields['subevent'].widget = Select2(
|
||
attrs={
|
||
'data-model-select2': 'event',
|
||
'data-select2-url': reverse('control:event.subevents.select2', kwargs={
|
||
'event': instance.event.slug,
|
||
'organizer': instance.event.organizer.slug,
|
||
}),
|
||
'data-placeholder': pgettext_lazy('subevent', 'Date')
|
||
}
|
||
)
|
||
self.fields['subevent'].widget.choices = self.fields['subevent'].choices
|
||
self.fields['subevent'].required = False
|
||
elif 'subevent':
|
||
del self.fields['subevent']
|
||
|
||
choices = []
|
||
if 'itemvar' in initial or (self.data and 'itemvar' in self.data):
|
||
iv = self.data.get('itemvar') or initial.get('itemvar', '')
|
||
if iv.startswith('q-'):
|
||
q = self.instance.event.quotas.get(pk=iv[2:])
|
||
choices.append(('q-%d' % q.pk, _('Any product in quota "{quota}"').format(quota=q)))
|
||
elif '-' in iv:
|
||
itemid, varid = iv.split('-')
|
||
i = self.instance.event.items.get(pk=itemid)
|
||
v = i.variations.get(pk=varid)
|
||
choices.append(('%d-%d' % (i.pk, v.pk), '%s – %s' % (str(i), v.value)))
|
||
elif iv:
|
||
i = self.instance.event.items.get(pk=iv)
|
||
if i.variations.exists():
|
||
choices.append((str(i.pk), _('{product} – Any variation').format(product=i)))
|
||
else:
|
||
choices.append((str(i.pk), str(i)))
|
||
|
||
self.fields['itemvar'].choices = choices
|
||
self.fields['itemvar'].widget = Select2ItemVarQuota(
|
||
attrs={
|
||
'data-model-select2': 'generic',
|
||
'data-select2-url': reverse('control:event.vouchers.itemselect2', kwargs={
|
||
'event': instance.event.slug,
|
||
'organizer': instance.event.organizer.slug,
|
||
}),
|
||
'data-placeholder': _('All products')
|
||
}
|
||
)
|
||
self.fields['itemvar'].required = False
|
||
self.fields['itemvar'].widget.choices = self.fields['itemvar'].choices
|
||
|
||
if self.instance.event.seating_plan or self.instance.event.subevents.filter(seating_plan__isnull=False).exists():
|
||
self.fields['seat'] = forms.CharField(
|
||
label=_("Specific seat ID"),
|
||
max_length=255,
|
||
required=False,
|
||
widget=forms.TextInput(attrs={'data-seat-guid-field': '1'}),
|
||
initial=self.instance.seat.seat_guid if self.instance.seat else '',
|
||
help_text=str(self.instance.seat) if self.instance.seat else '',
|
||
)
|
||
|
||
def clean(self):
|
||
data = super().clean()
|
||
|
||
if not self._errors:
|
||
try:
|
||
itemid = quotaid = None
|
||
iv = self.data.get('itemvar', '')
|
||
if iv.startswith('q-'):
|
||
quotaid = iv[2:]
|
||
elif '-' in iv:
|
||
itemid, varid = iv.split('-')
|
||
elif iv:
|
||
itemid, varid = iv, None
|
||
else:
|
||
itemid, varid = None, None
|
||
|
||
if itemid:
|
||
self.instance.item = self.instance.event.items.get(pk=itemid)
|
||
if varid:
|
||
self.instance.variation = self.instance.item.variations.get(pk=varid)
|
||
else:
|
||
self.instance.variation = None
|
||
self.instance.quota = None
|
||
elif quotaid:
|
||
self.instance.quota = self.instance.event.quotas.get(pk=quotaid)
|
||
self.instance.item = None
|
||
self.instance.variation = None
|
||
else:
|
||
self.instance.quota = None
|
||
self.instance.item = None
|
||
self.instance.variation = None
|
||
|
||
except ObjectDoesNotExist:
|
||
raise ValidationError(_("Invalid product selected."))
|
||
|
||
if 'codes' in data:
|
||
data['codes'] = [a.strip() for a in data.get('codes', '').strip().split("\n") if a]
|
||
cnt = len(data['codes']) * data.get('max_usages', 0)
|
||
else:
|
||
cnt = data.get('max_usages', 0)
|
||
|
||
Voucher.clean_item_properties(
|
||
data, self.instance.event,
|
||
self.instance.quota, self.instance.item, self.instance.variation,
|
||
seats_given=data.get('seat') or data.get('seats'),
|
||
block_quota=data.get('block_quota')
|
||
)
|
||
if not self.instance.show_hidden_items and (
|
||
(self.instance.quota and all(i.hide_without_voucher for i in self.instance.quota.items.all()))
|
||
or (self.instance.item and self.instance.item.hide_without_voucher)
|
||
):
|
||
raise ValidationError({
|
||
'show_hidden_items': [
|
||
_('The voucher only matches hidden products but you have not selected that it should show '
|
||
'them.')
|
||
]
|
||
})
|
||
Voucher.clean_subevent(
|
||
data, self.instance.event
|
||
)
|
||
Voucher.clean_max_usages(data, self.instance.redeemed)
|
||
check_quota = Voucher.clean_quota_needs_checking(
|
||
data, self.initial_instance_data,
|
||
item_changed=data.get('itemvar') != self.initial.get('itemvar'),
|
||
creating=not self.instance.pk
|
||
)
|
||
if check_quota:
|
||
Voucher.clean_quota_check(
|
||
data, cnt, self.initial_instance_data, self.instance.event,
|
||
self.instance.quota, self.instance.item, self.instance.variation
|
||
)
|
||
Voucher.clean_voucher_code(data, self.instance.event, self.instance.pk)
|
||
if 'seat' in self.fields and data.get('seat'):
|
||
self.instance.seat = Voucher.clean_seat_id(
|
||
data, self.instance.item, self.instance.quota, self.instance.event, self.instance.pk
|
||
)
|
||
self.instance.item = self.instance.seat.product
|
||
|
||
voucher_form_validation.send(sender=self.instance.event, form=self, data=data)
|
||
|
||
return data
|
||
|
||
def save(self, commit=True):
|
||
return super().save(commit)
|
||
|
||
|
||
class VoucherBulkForm(VoucherForm):
|
||
codes = forms.CharField(
|
||
widget=forms.Textarea,
|
||
label=_("Codes"),
|
||
help_text=_(
|
||
"Add one voucher code per line. We suggest that you copy this list and save it into a file."
|
||
),
|
||
required=True
|
||
)
|
||
send = forms.BooleanField(
|
||
label=_("Send vouchers via email"),
|
||
required=False
|
||
)
|
||
send_subject = forms.CharField(
|
||
label=_("Subject"),
|
||
widget=forms.TextInput(attrs={'data-display-dependency': '#id_send'}),
|
||
required=False,
|
||
initial=_('Your voucher for {event}')
|
||
)
|
||
send_message = forms.CharField(
|
||
label=_("Message"),
|
||
widget=forms.Textarea(attrs={'data-display-dependency': '#id_send'}),
|
||
required=False,
|
||
initial=_('Hello,\n\n'
|
||
'with this email, we\'re sending you one or more vouchers for {event}:\n\n{voucher_list}\n\n'
|
||
'You can redeem them here in our ticket shop:\n\n{url}\n\nBest regards,\n\n'
|
||
'Your {event} team')
|
||
)
|
||
send_recipients = forms.CharField(
|
||
label=_('Recipients'),
|
||
widget=forms.Textarea(attrs={
|
||
'data-display-dependency': '#id_send',
|
||
'placeholder': 'email,number,name,tag\njohn@example.org,3,John,example\n\n-- {} --\n\njohn@example.org\njane@example.net'.format(
|
||
_('or')
|
||
)
|
||
}),
|
||
required=False,
|
||
help_text=_('You can either supply a list of email addresses with one email address per line, or a CSV file with a title column '
|
||
'and one or more of the columns "email", "number", "name", or "tag".')
|
||
)
|
||
Recipient = namedtuple('Recipient', 'email number name tag')
|
||
|
||
def _set_field_placeholders(self, fn, base_parameters):
|
||
phs = [
|
||
'{%s}' % p
|
||
for p in sorted(get_available_placeholders(self.instance.event, base_parameters).keys())
|
||
]
|
||
ht = _('Available placeholders: {list}').format(
|
||
list=', '.join(phs)
|
||
)
|
||
if self.fields[fn].help_text:
|
||
self.fields[fn].help_text += ' ' + str(ht)
|
||
else:
|
||
self.fields[fn].help_text = ht
|
||
self.fields[fn].validators.append(
|
||
PlaceholderValidator(phs)
|
||
)
|
||
|
||
class Meta:
|
||
model = Voucher
|
||
localized_fields = '__all__'
|
||
fields = [
|
||
'valid_until', 'block_quota', 'allow_ignore_quota', 'value', 'tag', 'comment',
|
||
'max_usages', 'price_mode', 'subevent', 'show_hidden_items', 'budget'
|
||
]
|
||
field_classes = {
|
||
'valid_until': SplitDateTimeField,
|
||
'subevent': SafeModelChoiceField,
|
||
}
|
||
widgets = {
|
||
'valid_until': SplitDateTimePickerWidget(),
|
||
}
|
||
labels = {
|
||
'max_usages': _('Maximum usages per voucher')
|
||
}
|
||
help_texts = {
|
||
'max_usages': _('Number of times times EACH of these vouchers can be redeemed.')
|
||
}
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
self._set_field_placeholders('send_subject', ['event', 'name'])
|
||
self._set_field_placeholders('send_message', ['event', 'voucher_list', 'name'])
|
||
if 'seat' in self.fields:
|
||
self.fields['seats'] = forms.CharField(
|
||
label=_("Specific seat IDs"),
|
||
required=False,
|
||
widget=forms.Textarea(attrs={'data-seat-guid-field': '1'}),
|
||
initial=self.instance.seat.seat_guid if self.instance.seat else '',
|
||
)
|
||
|
||
def clean_send_recipients(self):
|
||
raw = self.cleaned_data['send_recipients']
|
||
if not raw:
|
||
return []
|
||
r = raw.split('\n')
|
||
res = []
|
||
if ',' in raw or ';' in raw:
|
||
if '@' in r[0]:
|
||
raise ValidationError(_('CSV input needs to contain a header row in the first line.'))
|
||
dialect = csv.Sniffer().sniff(raw[:1024])
|
||
reader = csv.DictReader(StringIO(raw), dialect=dialect)
|
||
if 'email' not in reader.fieldnames:
|
||
raise ValidationError(_('CSV input needs to contain a field with the header "{header}".').format(header="email"))
|
||
unknown_fields = [f for f in reader.fieldnames if f not in ('email', 'name', 'tag', 'number')]
|
||
if unknown_fields:
|
||
raise ValidationError(_('CSV input contains an unknown field with the header "{header}".').format(header=unknown_fields[0]))
|
||
for i, row in enumerate(reader):
|
||
try:
|
||
EmailValidator()(row['email'])
|
||
except ValidationError as err:
|
||
raise ValidationError(_('{value} is not a valid email address.').format(value=row['email'])) from err
|
||
try:
|
||
res.append(self.Recipient(
|
||
name=row.get('name', ''),
|
||
email=row['email'].strip(),
|
||
number=int(row.get('number', 1)),
|
||
tag=row.get('tag', None)
|
||
))
|
||
except ValueError as err:
|
||
raise ValidationError(_('Invalid value in row {number}.').format(number=i + 1)) from err
|
||
else:
|
||
for e in r:
|
||
try:
|
||
EmailValidator()(e.strip())
|
||
except ValidationError as err:
|
||
raise ValidationError(_('{value} is not a valid email address.').format(value=e.strip())) from err
|
||
else:
|
||
res.append(self.Recipient(email=e.strip(), number=1, tag=None, name=''))
|
||
return res
|
||
|
||
def clean(self):
|
||
data = super().clean()
|
||
|
||
vouchers = self.instance.event.vouchers.annotate(
|
||
code_lower=Lower('code')
|
||
).filter(code_lower__in=[c.lower() for c in data['codes']])
|
||
if vouchers.exists():
|
||
raise ValidationError(_('A voucher with one of these codes already exists.'))
|
||
|
||
if data.get('send') and not all([data.get('send_subject'), data.get('send_message'), data.get('send_recipients')]):
|
||
raise ValidationError(_('If vouchers should be sent by email, subject, message and recipients need to be specified.'))
|
||
|
||
if data.get('codes') and data.get('send'):
|
||
recp = self.cleaned_data.get('send_recipients', [])
|
||
code_len = len(data.get('codes'))
|
||
recp_len = sum(r.number for r in recp)
|
||
if code_len != recp_len:
|
||
raise ValidationError(_('You generated {codes} vouchers, but entered recipients for {recp} vouchers.').format(codes=code_len, recp=recp_len))
|
||
|
||
if data.get('seats'):
|
||
seatids = [s.strip() for s in data.get('seats').strip().split("\n") if s]
|
||
if len(seatids) != len(data.get('codes')):
|
||
raise ValidationError(_('You need to specify as many seats as voucher codes.'))
|
||
data['seats'] = []
|
||
for s in seatids:
|
||
data['seat'] = s
|
||
data['seats'].append(Voucher.clean_seat_id(
|
||
data, self.instance.item, self.instance.quota, self.instance.event, None
|
||
))
|
||
self.instance.seat = data['seats'][0] # Trick model-level validation
|
||
else:
|
||
data['seats'] = []
|
||
|
||
return data
|
||
|
||
def save(self, event, *args, **kwargs):
|
||
objs = []
|
||
for code in self.cleaned_data['codes']:
|
||
obj = modelcopy(self.instance)
|
||
obj.event = event
|
||
obj.code = code
|
||
try:
|
||
obj.seat = self.cleaned_data['seats'].pop()
|
||
obj.item = obj.seat.product
|
||
except IndexError:
|
||
pass
|
||
data = dict(self.cleaned_data)
|
||
data['code'] = code
|
||
data['bulk'] = True
|
||
del data['codes']
|
||
objs.append(obj)
|
||
Voucher.objects.bulk_create(objs, batch_size=200)
|
||
objs = []
|
||
for v in event.vouchers.filter(code__in=self.cleaned_data['codes']):
|
||
# We need to query them again as bulk_create does not fill in .pk values on databases
|
||
# other than PostgreSQL
|
||
objs.append(v)
|
||
return objs
|