forked from CGM_Public/pretix_original
Allow to import orders (#1516)
* Allow to import orders * seats, subevents * Plugin support * Add docs * Warn about lack of quota handling * Control interface test * Test skeleton * First tests for the impotr columns * Add tests for all columns * Fix question validation
This commit is contained in:
@@ -13,7 +13,7 @@ class PretixBaseConfig(AppConfig):
|
||||
from . import invoice # NOQA
|
||||
from . import notifications # NOQA
|
||||
from . import email # NOQA
|
||||
from .services import auth, checkin, export, mail, tickets, cart, orders, invoices, cleanup, update_check, quotas, notifications, vouchers # NOQA
|
||||
from .services import auth, checkin, export, mail, tickets, cart, orderimport, orders, invoices, cleanup, update_check, quotas, notifications, vouchers # NOQA
|
||||
|
||||
try:
|
||||
from .celery_app import app as celery_app # NOQA
|
||||
|
||||
@@ -1106,17 +1106,25 @@ class Question(LoggedModel):
|
||||
|
||||
if self.type == Question.TYPE_CHOICE:
|
||||
try:
|
||||
return self.options.get(pk=answer)
|
||||
return self.options.get(Q(pk=answer) | Q(identifier=answer))
|
||||
except:
|
||||
raise ValidationError(_('Invalid option selected.'))
|
||||
elif self.type == Question.TYPE_CHOICE_MULTIPLE:
|
||||
try:
|
||||
if isinstance(answer, str):
|
||||
return list(self.options.filter(pk__in=answer.split(",")))
|
||||
else:
|
||||
return list(self.options.filter(pk__in=answer))
|
||||
except:
|
||||
if isinstance(answer, str):
|
||||
l_ = list(self.options.filter(
|
||||
Q(pk__in=[a for a in answer.split(",") if a.isdigit()]) |
|
||||
Q(identifier__in=answer.split(","))
|
||||
))
|
||||
llen = len(answer.split(','))
|
||||
else:
|
||||
l_ = list(self.options.filter(
|
||||
Q(pk__in=[a for a in answer if isinstance(a, int) or a.isdigit()]) |
|
||||
Q(identifier__in=answer)
|
||||
))
|
||||
llen = len(answer)
|
||||
if len(l_) != llen:
|
||||
raise ValidationError(_('Invalid option selected.'))
|
||||
return l_
|
||||
elif self.type == Question.TYPE_BOOLEAN:
|
||||
return answer in ('true', 'True', True)
|
||||
elif self.type == Question.TYPE_NUMBER:
|
||||
|
||||
@@ -2033,7 +2033,7 @@ class InvoiceAddress(models.Model):
|
||||
internal_reference = models.TextField(
|
||||
verbose_name=_('Internal reference'),
|
||||
help_text=_('This reference will be printed on your invoice for your convenience.'),
|
||||
blank=True
|
||||
blank=True,
|
||||
)
|
||||
beneficiary = models.TextField(
|
||||
verbose_name=_('Beneficiary'),
|
||||
|
||||
612
src/pretix/base/orderimport.py
Normal file
612
src/pretix/base/orderimport.py
Normal file
@@ -0,0 +1,612 @@
|
||||
import re
|
||||
from decimal import Decimal, DecimalException
|
||||
|
||||
import pycountry
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.validators import EmailValidator
|
||||
from django.utils import formats
|
||||
from django.utils.functional import cached_property
|
||||
from django.utils.translation import (
|
||||
gettext as _, gettext_lazy, pgettext, pgettext_lazy,
|
||||
)
|
||||
from django_countries import countries
|
||||
from django_countries.fields import Country
|
||||
|
||||
from pretix.base.channels import get_all_sales_channels
|
||||
from pretix.base.forms.questions import guess_country
|
||||
from pretix.base.models import (
|
||||
ItemVariation, OrderPosition, QuestionAnswer, QuestionOption, Seat,
|
||||
)
|
||||
from pretix.base.services.pricing import get_price
|
||||
from pretix.base.settings import (
|
||||
COUNTRIES_WITH_STATE_IN_ADDRESS, PERSON_NAME_SCHEMES,
|
||||
)
|
||||
from pretix.base.signals import order_import_columns
|
||||
|
||||
|
||||
class ImportColumn:
|
||||
@property
|
||||
def identifier(self):
|
||||
"""
|
||||
Unique, internal name of the column.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
"""
|
||||
Human-readable description of the column
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def initial(self):
|
||||
"""
|
||||
Initial value for the form component
|
||||
"""
|
||||
return None
|
||||
|
||||
@property
|
||||
def default_value(self):
|
||||
"""
|
||||
Internal default value for the assignment of this column. Defaults to ``empty``. Return ``None`` to disable this
|
||||
option.
|
||||
"""
|
||||
return 'empty'
|
||||
|
||||
@property
|
||||
def default_label(self):
|
||||
"""
|
||||
Human-readable description of the default assignment of this column, defaults to "Keep empty".
|
||||
"""
|
||||
return gettext_lazy('Keep empty')
|
||||
|
||||
def __init__(self, event):
|
||||
self.event = event
|
||||
|
||||
def static_choices(self):
|
||||
"""
|
||||
This will be called when rendering the form component and allows you to return a list of values that can be
|
||||
selected by the user statically during import.
|
||||
|
||||
:return: list of 2-tuples of strings
|
||||
"""
|
||||
return []
|
||||
|
||||
def resolve(self, settings, record):
|
||||
"""
|
||||
This method will be called to get the raw value for this field, usually by either using a static value or
|
||||
inspecting the CSV file for the assigned header. You usually do not need to implement this on your own,
|
||||
the default should be fine.
|
||||
"""
|
||||
k = settings.get(self.identifier, self.default_value)
|
||||
if k == self.default_value:
|
||||
return None
|
||||
elif k.startswith('csv:'):
|
||||
return record.get(k[4:], None) or None
|
||||
elif k.startswith('static:'):
|
||||
return k[7:]
|
||||
raise ValidationError(_('Invalid setting for column "{header}".').format(header=self.verbose_name))
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
"""
|
||||
Allows you to validate the raw input value for your column. Raise ``ValidationError`` if the value is invalid.
|
||||
You do not need to include the column or row name or value in the error message as it will automatically be
|
||||
included.
|
||||
|
||||
:param value: Contains the raw value of your column as returned by ``resolve``. This can usually be ``None``,
|
||||
e.g. if the column is empty or does not exist in this row.
|
||||
:param previous_values: Dictionary containing the validated values of all columns that have already been validated.
|
||||
"""
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
"""
|
||||
This will be called to perform the actual import. You are supposed to set attributes on the ``order``, ``position``,
|
||||
or ``invoice_address`` objects based on the input ``value``. This is called *before* the actual database
|
||||
transaction, so these three objects do not yet have a primary key. If you want to create related objects, you
|
||||
need to place them into some sort of internal queue and persist them when ``save`` is called.
|
||||
"""
|
||||
pass
|
||||
|
||||
def save(self, order):
|
||||
"""
|
||||
This will be called to perform the actual import. This is called inside the actual database transaction and the
|
||||
input object ``order`` has already been saved to the database.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class EmailColumn(ImportColumn):
|
||||
identifier = 'email'
|
||||
verbose_name = gettext_lazy('E-mail address')
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value:
|
||||
EmailValidator()(value)
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
order.email = value
|
||||
|
||||
|
||||
class SubeventColumn(ImportColumn):
|
||||
identifier = 'subevent'
|
||||
verbose_name = pgettext_lazy('subevents', 'Date')
|
||||
default_value = None
|
||||
|
||||
@cached_property
|
||||
def subevents(self):
|
||||
return list(self.event.subevents.filter(active=True).order_by('date_from'))
|
||||
|
||||
def static_choices(self):
|
||||
return [
|
||||
(str(p.pk), str(p)) for p in self.subevents
|
||||
]
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if not value:
|
||||
raise ValidationError(pgettext("subevent", "You need to select a date."))
|
||||
matches = [
|
||||
p for p in self.subevents
|
||||
if str(p.pk) == value or any(
|
||||
(v and v == value) for v in i18n_flat(p.name)) or p.date_from.isoformat() == value
|
||||
]
|
||||
if len(matches) == 0:
|
||||
raise ValidationError(pgettext("subevent", "No matching date was found."))
|
||||
if len(matches) > 1:
|
||||
raise ValidationError(pgettext("subevent", "Multiple matching dates were found."))
|
||||
return matches[0]
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
position.subevent = value
|
||||
|
||||
|
||||
def i18n_flat(l):
|
||||
if isinstance(l.data, dict):
|
||||
return l.data.values()
|
||||
return [l.data]
|
||||
|
||||
|
||||
class ItemColumn(ImportColumn):
|
||||
identifier = 'item'
|
||||
verbose_name = gettext_lazy('Product')
|
||||
default_value = None
|
||||
|
||||
@cached_property
|
||||
def items(self):
|
||||
return list(self.event.items.filter(active=True))
|
||||
|
||||
def static_choices(self):
|
||||
return [
|
||||
(str(p.pk), str(p)) for p in self.items
|
||||
]
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
matches = [
|
||||
p for p in self.items
|
||||
if str(p.pk) == value or (p.internal_name and p.internal_name == value) or any(
|
||||
(v and v == value) for v in i18n_flat(p.name))
|
||||
]
|
||||
if len(matches) == 0:
|
||||
raise ValidationError(_("No matching product was found."))
|
||||
if len(matches) > 1:
|
||||
raise ValidationError(_("Multiple matching products were found."))
|
||||
return matches[0]
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
position.item = value
|
||||
|
||||
|
||||
class Variation(ImportColumn):
|
||||
identifier = 'variation'
|
||||
verbose_name = gettext_lazy('Product variation')
|
||||
|
||||
@cached_property
|
||||
def items(self):
|
||||
return list(ItemVariation.objects.filter(
|
||||
active=True, item__active=True, item__event=self.event
|
||||
).select_related('item'))
|
||||
|
||||
def static_choices(self):
|
||||
return [
|
||||
(str(p.pk), '{} – {}'.format(p.item, p.value)) for p in self.items
|
||||
]
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value:
|
||||
matches = [
|
||||
p for p in self.items
|
||||
if str(p.pk) == value or any((v and v == value) for v in i18n_flat(p.value)) and p.item_id == previous_values['item'].pk
|
||||
]
|
||||
if len(matches) == 0:
|
||||
raise ValidationError(_("No matching variation was found."))
|
||||
if len(matches) > 1:
|
||||
raise ValidationError(_("Multiple matching variations were found."))
|
||||
return matches[0]
|
||||
elif previous_values['item'].variations.exists():
|
||||
raise ValidationError(_("You need to select a variation for this product."))
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
position.variation = value
|
||||
|
||||
|
||||
class InvoiceAddressCompany(ImportColumn):
|
||||
identifier = 'invoice_address_company'
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + _('Company')
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.company = value or ''
|
||||
invoice_address.is_business = bool(value)
|
||||
|
||||
|
||||
class InvoiceAddressNamePart(ImportColumn):
|
||||
def __init__(self, event, key, label):
|
||||
self.key = key
|
||||
self.label = label
|
||||
super().__init__(event)
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + str(self.label)
|
||||
|
||||
@property
|
||||
def identifier(self):
|
||||
return 'invoice_address_name_{}'.format(self.key)
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.name_parts[self.key] = value or ''
|
||||
|
||||
|
||||
class InvoiceAddressStreet(ImportColumn):
|
||||
identifier = 'invoice_address_street'
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + _('Address')
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.address = value or ''
|
||||
|
||||
|
||||
class InvoiceAddressZip(ImportColumn):
|
||||
identifier = 'invoice_address_zipcode'
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + _('ZIP code')
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.zipcode = value or ''
|
||||
|
||||
|
||||
class InvoiceAddressCity(ImportColumn):
|
||||
identifier = 'invoice_address_city'
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + _('City')
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.city = value or ''
|
||||
|
||||
|
||||
class InvoiceAddressCountry(ImportColumn):
|
||||
identifier = 'invoice_address_country'
|
||||
default_value = None
|
||||
|
||||
@property
|
||||
def initial(self):
|
||||
return 'static:' + str(guess_country(self.event))
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + _('Country')
|
||||
|
||||
def static_choices(self):
|
||||
return list(countries)
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value and not Country(value).numeric:
|
||||
raise ValidationError(_("Please enter a valid country code."))
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.country = value
|
||||
|
||||
|
||||
class InvoiceAddressState(ImportColumn):
|
||||
identifier = 'invoice_address_state'
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + _('State')
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value:
|
||||
if previous_values.get('invoice_address_country') not in COUNTRIES_WITH_STATE_IN_ADDRESS:
|
||||
raise ValidationError(_("States are not supported for this country."))
|
||||
|
||||
types, form = COUNTRIES_WITH_STATE_IN_ADDRESS[previous_values.get('invoice_address_country')]
|
||||
match = [
|
||||
s for s in pycountry.subdivisions.get(country_code=previous_values.get('invoice_address_country'))
|
||||
if s.type in types and (s.code[3:] == value or s.name == value)
|
||||
]
|
||||
if len(match) == 0:
|
||||
raise ValidationError(_("Please enter a valid state."))
|
||||
return match[0].code[3:]
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.state = value or ''
|
||||
|
||||
|
||||
class InvoiceAddressVATID(ImportColumn):
|
||||
identifier = 'invoice_address_vat_id'
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + _('VAT ID')
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.vat_id = value or ''
|
||||
|
||||
|
||||
class InvoiceAddressReference(ImportColumn):
|
||||
identifier = 'invoice_address_internal_reference'
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Invoice address') + ': ' + _('Internal reference')
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
invoice_address.internal_reference = value or ''
|
||||
|
||||
|
||||
class AttendeeNamePart(ImportColumn):
|
||||
def __init__(self, event, key, label):
|
||||
self.key = key
|
||||
self.label = label
|
||||
super().__init__(event)
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Attendee name') + ': ' + str(self.label)
|
||||
|
||||
@property
|
||||
def identifier(self):
|
||||
return 'attendee_name_{}'.format(self.key)
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
position.attendee_name_parts[self.key] = value or ''
|
||||
|
||||
|
||||
class AttendeeEmail(ImportColumn):
|
||||
identifier = 'attendee_email'
|
||||
verbose_name = gettext_lazy('Attendee e-mail address')
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value:
|
||||
EmailValidator()(value)
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
position.attendee_email = value
|
||||
|
||||
|
||||
class Price(ImportColumn):
|
||||
identifier = 'price'
|
||||
verbose_name = gettext_lazy('Price')
|
||||
default_label = gettext_lazy('Calculate from product')
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value not in (None, ''):
|
||||
value = formats.sanitize_separators(re.sub(r'[^0-9.,-]', '', value))
|
||||
try:
|
||||
value = Decimal(value)
|
||||
except (DecimalException, TypeError):
|
||||
raise ValidationError(_('You entered an invalid number.'))
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
if value is None:
|
||||
p = get_price(position.item, position.variation, position.voucher, subevent=position.subevent,
|
||||
invoice_address=invoice_address)
|
||||
else:
|
||||
p = get_price(position.item, position.variation, position.voucher, subevent=position.subevent,
|
||||
invoice_address=invoice_address, custom_price=value, force_custom_price=True)
|
||||
position.price = p.gross
|
||||
position.tax_rule = position.item.tax_rule
|
||||
position.tax_rate = p.rate
|
||||
position.tax_value = p.tax
|
||||
|
||||
|
||||
class Secret(ImportColumn):
|
||||
identifier = 'secret'
|
||||
verbose_name = gettext_lazy('Ticket code')
|
||||
default_label = gettext_lazy('Generate automatically')
|
||||
|
||||
def __init__(self, *args):
|
||||
self._cached = set()
|
||||
super().__init__(*args)
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value and (value in self._cached or OrderPosition.all.filter(order__event=self.event, secret=value).exists()):
|
||||
raise ValidationError(
|
||||
_('You cannot assign a position secret that already exists.')
|
||||
)
|
||||
self._cached.add(value)
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
if value:
|
||||
position.secret = value
|
||||
|
||||
|
||||
class Locale(ImportColumn):
|
||||
identifier = 'locale'
|
||||
verbose_name = gettext_lazy('Order locale')
|
||||
default_value = None
|
||||
|
||||
@property
|
||||
def initial(self):
|
||||
return 'static:' + self.event.settings.locale
|
||||
|
||||
def static_choices(self):
|
||||
locale_names = dict(settings.LANGUAGES)
|
||||
return [
|
||||
(a, locale_names[a]) for a in self.event.settings.locales
|
||||
]
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if not value:
|
||||
value = self.event.settings.locale
|
||||
if value not in self.event.settings.locales:
|
||||
raise ValidationError(_("Please enter a valid language code."))
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
order.locale = value
|
||||
|
||||
|
||||
class Saleschannel(ImportColumn):
|
||||
identifier = 'sales_channel'
|
||||
verbose_name = gettext_lazy('Sales channel')
|
||||
|
||||
def static_choices(self):
|
||||
return [
|
||||
(sc.identifier, sc.verbose_name) for sc in get_all_sales_channels().values()
|
||||
]
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if not value:
|
||||
value = 'web'
|
||||
if value not in get_all_sales_channels():
|
||||
raise ValidationError(_("Please enter a valid sales channel."))
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
order.sales_channel = value
|
||||
|
||||
|
||||
class SeatColumn(ImportColumn):
|
||||
identifier = 'seat'
|
||||
verbose_name = gettext_lazy('Seat ID')
|
||||
|
||||
def __init__(self, *args):
|
||||
self._cached = set()
|
||||
super().__init__(*args)
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value:
|
||||
try:
|
||||
value = Seat.objects.get(
|
||||
seat_guid=value,
|
||||
subevent=previous_values.get('subevent')
|
||||
)
|
||||
except Seat.DoesNotExist:
|
||||
raise ValidationError(_('No matching seat was found.'))
|
||||
if not value.is_available() or value in self._cached:
|
||||
raise ValidationError(
|
||||
_('The seat you selected has already been taken. Please select a different seat.'))
|
||||
self._cached.add(value)
|
||||
elif previous_values['item'].seat_category_mappings.filter(subevent=previous_values.get('subevent')).exists():
|
||||
raise ValidationError(_('You need to select a specific seat.'))
|
||||
return value
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
position.seat = value
|
||||
|
||||
|
||||
class Comment(ImportColumn):
|
||||
identifier = 'comment'
|
||||
verbose_name = gettext_lazy('Comment')
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
order.comment = value or ''
|
||||
|
||||
|
||||
class QuestionColumn(ImportColumn):
|
||||
def __init__(self, event, q):
|
||||
self.q = q
|
||||
super().__init__(event)
|
||||
|
||||
@property
|
||||
def verbose_name(self):
|
||||
return _('Question') + ': ' + str(self.q.question)
|
||||
|
||||
@property
|
||||
def identifier(self):
|
||||
return 'question_{}'.format(self.q.pk)
|
||||
|
||||
def clean(self, value, previous_values):
|
||||
if value:
|
||||
return self.q.clean_answer(value)
|
||||
|
||||
def assign(self, value, order, position, invoice_address, **kwargs):
|
||||
if value:
|
||||
if not hasattr(order, '_answers'):
|
||||
order._answers = []
|
||||
if isinstance(value, QuestionOption):
|
||||
a = QuestionAnswer(orderposition=position, question=self.q, answer=str(value))
|
||||
a._options = [value]
|
||||
order._answers.append(a)
|
||||
elif isinstance(value, list):
|
||||
a = QuestionAnswer(orderposition=position, question=self.q, answer=', '.join(str(v) for v in value))
|
||||
a._options = value
|
||||
order._answers.append(a)
|
||||
else:
|
||||
order._answers.append(QuestionAnswer(question=self.q, answer=str(value), orderposition=position))
|
||||
|
||||
def save(self, order):
|
||||
for a in getattr(order, '_answers', []):
|
||||
a.orderposition = a.orderposition # This is apparently required after save() again
|
||||
a.save()
|
||||
if hasattr(a, '_options'):
|
||||
a.options.add(*a._options)
|
||||
|
||||
|
||||
def get_all_columns(event):
|
||||
default = []
|
||||
if event.has_subevents:
|
||||
default.append(SubeventColumn(event))
|
||||
default += [
|
||||
EmailColumn(event),
|
||||
ItemColumn(event),
|
||||
Variation(event),
|
||||
InvoiceAddressCompany(event),
|
||||
]
|
||||
scheme = PERSON_NAME_SCHEMES.get(event.settings.name_scheme)
|
||||
for n, l, w in scheme['fields']:
|
||||
default.append(InvoiceAddressNamePart(event, n, l))
|
||||
default += [
|
||||
InvoiceAddressStreet(event),
|
||||
InvoiceAddressZip(event),
|
||||
InvoiceAddressCity(event),
|
||||
InvoiceAddressCountry(event),
|
||||
InvoiceAddressState(event),
|
||||
InvoiceAddressVATID(event),
|
||||
InvoiceAddressReference(event),
|
||||
]
|
||||
for n, l, w in scheme['fields']:
|
||||
default.append(AttendeeNamePart(event, n, l))
|
||||
default += [
|
||||
AttendeeEmail(event),
|
||||
Price(event),
|
||||
Secret(event),
|
||||
Locale(event),
|
||||
Saleschannel(event),
|
||||
SeatColumn(event),
|
||||
Comment(event)
|
||||
]
|
||||
for q in event.questions.exclude(type='F'):
|
||||
default.append(QuestionColumn(event, q))
|
||||
|
||||
for recv, resp in order_import_columns.send(sender=event):
|
||||
default += resp
|
||||
|
||||
return default
|
||||
173
src/pretix/base/services/orderimport.py
Normal file
173
src/pretix/base/services/orderimport.py
Normal file
@@ -0,0 +1,173 @@
|
||||
import csv
|
||||
import io
|
||||
from decimal import Decimal
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import transaction
|
||||
from django.utils.timezone import now
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from pretix.base.i18n import LazyLocaleException, language
|
||||
from pretix.base.models import (
|
||||
CachedFile, Event, InvoiceAddress, Order, OrderPayment, OrderPosition,
|
||||
User,
|
||||
)
|
||||
from pretix.base.orderimport import get_all_columns
|
||||
from pretix.base.services.invoices import generate_invoice, invoice_qualified
|
||||
from pretix.base.services.tasks import ProfiledEventTask
|
||||
from pretix.base.signals import order_paid, order_placed
|
||||
from pretix.celery_app import app
|
||||
|
||||
|
||||
class DataImportError(LazyLocaleException):
|
||||
def __init__(self, *args):
|
||||
msg = args[0]
|
||||
msgargs = args[1] if len(args) > 1 else None
|
||||
self.args = args
|
||||
if msgargs:
|
||||
msg = _(msg) % msgargs
|
||||
else:
|
||||
msg = _(msg)
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
def parse_csv(file, length=None):
|
||||
data = file.read(length)
|
||||
try:
|
||||
import chardet
|
||||
charset = chardet.detect(data)['encoding']
|
||||
except ImportError:
|
||||
charset = file.charset
|
||||
data = data.decode(charset or 'utf-8')
|
||||
# If the file was modified on a Mac, it only contains \r as line breaks
|
||||
if '\r' in data and '\n' not in data:
|
||||
data = data.replace('\r', '\n')
|
||||
|
||||
dialect = csv.Sniffer().sniff(data.split("\n")[0], delimiters=";,.#:")
|
||||
if dialect is None:
|
||||
return None
|
||||
|
||||
reader = csv.DictReader(io.StringIO(data), dialect=dialect)
|
||||
return reader
|
||||
|
||||
|
||||
def setif(record, obj, attr, setting):
|
||||
if setting.startswith('csv:'):
|
||||
setattr(obj, attr, record[setting[4:]] or '')
|
||||
|
||||
|
||||
@app.task(base=ProfiledEventTask, throws=(DataImportError,))
|
||||
def import_orders(event: Event, fileid: str, settings: dict, locale: str, user) -> None:
|
||||
# TODO: quotacheck?
|
||||
cf = CachedFile.objects.get(id=fileid)
|
||||
user = User.objects.get(pk=user)
|
||||
with language(locale):
|
||||
cols = get_all_columns(event)
|
||||
parsed = parse_csv(cf.file)
|
||||
orders = []
|
||||
order = None
|
||||
data = []
|
||||
|
||||
# Run validation
|
||||
for i, record in enumerate(parsed):
|
||||
values = {}
|
||||
for c in cols:
|
||||
val = c.resolve(settings, record)
|
||||
try:
|
||||
values[c.identifier] = c.clean(val, values)
|
||||
except ValidationError as e:
|
||||
raise DataImportError(
|
||||
_(
|
||||
'Error while importing value "{value}" for column "{column}" in line "{line}": {message}').format(
|
||||
value=val if val is not None else '', column=c.verbose_name, line=i + 1, message=e.message
|
||||
)
|
||||
)
|
||||
data.append(values)
|
||||
|
||||
# Prepare model objects. Yes, this might consume lots of RAM, but allows us to make the actual SQL transaction
|
||||
# shorter. We'll see what works better in reality…
|
||||
for i, record in enumerate(data):
|
||||
try:
|
||||
if order is None or settings['orders'] == 'many':
|
||||
order = Order(
|
||||
event=event,
|
||||
testmode=settings['testmode'],
|
||||
)
|
||||
order.meta_info = {}
|
||||
order._positions = []
|
||||
order._address = InvoiceAddress()
|
||||
order._address.name_parts = {'_scheme': event.settings.name_scheme}
|
||||
orders.append(order)
|
||||
|
||||
position = OrderPosition()
|
||||
position.attendee_name_parts = {'_scheme': event.settings.name_scheme}
|
||||
position.meta_info = {}
|
||||
order._positions.append(position)
|
||||
position.assign_pseudonymization_id()
|
||||
|
||||
for c in cols:
|
||||
c.assign(record.get(c.identifier), order, position, order._address)
|
||||
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
_('Invalid data in row {row}: {message}').format(row=i, message=str(e))
|
||||
)
|
||||
|
||||
# quota check?
|
||||
with event.lock():
|
||||
with transaction.atomic():
|
||||
for o in orders:
|
||||
o.total = sum([c.price for c in o._positions]) # currently no support for fees
|
||||
if o.total == Decimal('0.00'):
|
||||
o.status = Order.STATUS_PAID
|
||||
o.save()
|
||||
OrderPayment.objects.create(
|
||||
local_id=1,
|
||||
order=o,
|
||||
amount=Decimal('0.00'),
|
||||
provider='free',
|
||||
info='{}',
|
||||
payment_date=now(),
|
||||
state=OrderPayment.PAYMENT_STATE_CONFIRMED
|
||||
)
|
||||
elif settings['status'] == 'paid':
|
||||
o.status = Order.STATUS_PAID
|
||||
o.save()
|
||||
OrderPayment.objects.create(
|
||||
local_id=1,
|
||||
order=o,
|
||||
amount=o.total,
|
||||
provider='manual',
|
||||
info='{}',
|
||||
payment_date=now(),
|
||||
state=OrderPayment.PAYMENT_STATE_CONFIRMED
|
||||
)
|
||||
else:
|
||||
o.status = Order.STATUS_PENDING
|
||||
o.save()
|
||||
for p in o._positions:
|
||||
p.order = o
|
||||
p.save()
|
||||
o._address.order = o
|
||||
o._address.save()
|
||||
for c in cols:
|
||||
c.save(o)
|
||||
o.log_action(
|
||||
'pretix.event.order.placed',
|
||||
user=user,
|
||||
data={'source': 'import'}
|
||||
)
|
||||
|
||||
for o in orders:
|
||||
with language(o.locale):
|
||||
order_placed.send(event, order=o)
|
||||
if o.status == Order.STATUS_PAID:
|
||||
order_paid.send(event, order=o)
|
||||
|
||||
gen_invoice = invoice_qualified(o) and (
|
||||
(event.settings.get('invoice_generate') == 'True') or
|
||||
(event.settings.get('invoice_generate') == 'paid' and o.status == Order.STATUS_PAID)
|
||||
) and not o.invoices.last()
|
||||
if gen_invoice:
|
||||
generate_invoice(o, trigger_pdf=True)
|
||||
cf.delete()
|
||||
@@ -683,6 +683,10 @@ Your {event} team"""))
|
||||
)),
|
||||
'type': LazyI18nString
|
||||
},
|
||||
'order_import_settings': {
|
||||
'default': '{}',
|
||||
'type': dict
|
||||
},
|
||||
'organizer_info_text': {
|
||||
'default': '',
|
||||
'type': LazyI18nString
|
||||
|
||||
@@ -630,3 +630,14 @@ invoice_line_text = EventPluginSignal(
|
||||
This signal is sent out when an invoice is built for an order. You can return additional text that
|
||||
should be shown on the invoice for the given ``position``.
|
||||
"""
|
||||
|
||||
order_import_columns = EventPluginSignal(
|
||||
providing_args=[]
|
||||
)
|
||||
"""
|
||||
This signal is sent out if the user performs an import of orders from an external source. You can use this
|
||||
to define additional columns that can be read during import. You are expected to return a list of instances of
|
||||
``ImportColumn`` subclasses.
|
||||
|
||||
As with all event-plugin signals, the ``sender`` keyword argument will contain the event.
|
||||
"""
|
||||
|
||||
53
src/pretix/control/forms/orderimport.py
Normal file
53
src/pretix/control/forms/orderimport.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from django import forms
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from pretix.base.services.orderimport import get_all_columns
|
||||
|
||||
|
||||
class ProcessForm(forms.Form):
|
||||
orders = forms.ChoiceField(
|
||||
label=_('Import mode'),
|
||||
choices=(
|
||||
('many', _('Create a separate order for each line')),
|
||||
('one', _('Create one order with one position per line')),
|
||||
)
|
||||
)
|
||||
status = forms.ChoiceField(
|
||||
label=_('Order status'),
|
||||
choices=(
|
||||
('paid', _('Create orders as fully paid')),
|
||||
('pending', _('Create orders as pending and still require payment')),
|
||||
)
|
||||
)
|
||||
testmode = forms.BooleanField(
|
||||
label=_('Create orders as test mode orders'),
|
||||
required=False
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
headers = kwargs.pop('headers')
|
||||
initital = kwargs.pop('initial', {})
|
||||
self.event = kwargs.pop('event')
|
||||
initital['testmode'] = self.event.testmode
|
||||
kwargs['initial'] = initital
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
header_choices = [
|
||||
('csv:{}'.format(h), _('CSV column: "{name}"').format(name=h)) for h in headers
|
||||
]
|
||||
|
||||
for c in get_all_columns(self.event):
|
||||
choices = []
|
||||
if c.default_value:
|
||||
choices.append((c.default_value, c.default_label))
|
||||
choices += header_choices
|
||||
for k, v in c.static_choices():
|
||||
choices.append(('static:{}'.format(k), v))
|
||||
|
||||
self.fields[c.identifier] = forms.ChoiceField(
|
||||
label=str(c.verbose_name),
|
||||
choices=choices,
|
||||
widget=forms.Select(
|
||||
attrs={'data-static': 'true'}
|
||||
)
|
||||
)
|
||||
@@ -169,6 +169,57 @@ def get_event_navigation(request: HttpRequest):
|
||||
})
|
||||
|
||||
if 'can_view_orders' in request.eventpermset:
|
||||
children = [
|
||||
{
|
||||
'label': _('All orders'),
|
||||
'url': reverse('control:event.orders', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': url.url_name in ('event.orders', 'event.order') or "event.order." in url.url_name,
|
||||
},
|
||||
{
|
||||
'label': _('Overview'),
|
||||
'url': reverse('control:event.orders.overview', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.overview' in url.url_name,
|
||||
},
|
||||
{
|
||||
'label': _('Refunds'),
|
||||
'url': reverse('control:event.orders.refunds', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.refunds' in url.url_name,
|
||||
},
|
||||
{
|
||||
'label': _('Export'),
|
||||
'url': reverse('control:event.orders.export', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.export' in url.url_name,
|
||||
},
|
||||
{
|
||||
'label': _('Waiting list'),
|
||||
'url': reverse('control:event.orders.waitinglist', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.waitinglist' in url.url_name,
|
||||
},
|
||||
]
|
||||
if 'can_change_orders' in request.eventpermset:
|
||||
children.append({
|
||||
'label': _('Import'),
|
||||
'url': reverse('control:event.orders.import', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.import' in url.url_name,
|
||||
})
|
||||
nav.append({
|
||||
'label': _('Orders'),
|
||||
'url': reverse('control:event.orders', kwargs={
|
||||
@@ -177,48 +228,7 @@ def get_event_navigation(request: HttpRequest):
|
||||
}),
|
||||
'active': False,
|
||||
'icon': 'shopping-cart',
|
||||
'children': [
|
||||
{
|
||||
'label': _('All orders'),
|
||||
'url': reverse('control:event.orders', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': url.url_name in ('event.orders', 'event.order') or "event.order." in url.url_name,
|
||||
},
|
||||
{
|
||||
'label': _('Overview'),
|
||||
'url': reverse('control:event.orders.overview', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.overview' in url.url_name,
|
||||
},
|
||||
{
|
||||
'label': _('Refunds'),
|
||||
'url': reverse('control:event.orders.refunds', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.refunds' in url.url_name,
|
||||
},
|
||||
{
|
||||
'label': _('Export'),
|
||||
'url': reverse('control:event.orders.export', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.export' in url.url_name,
|
||||
},
|
||||
{
|
||||
'label': _('Waiting list'),
|
||||
'url': reverse('control:event.orders.waitinglist', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.event.organizer.slug,
|
||||
}),
|
||||
'active': 'event.orders.waitinglist' in url.url_name,
|
||||
},
|
||||
]
|
||||
'children': children
|
||||
})
|
||||
|
||||
if 'can_view_vouchers' in request.eventpermset:
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
{% extends "pretixcontrol/event/base.html" %}
|
||||
{% load i18n %}
|
||||
{% load static %}
|
||||
{% load getitem %}
|
||||
{% load bootstrap3 %}
|
||||
{% block title %}{% trans "Import attendees" %}{% endblock %}
|
||||
{% block content %}
|
||||
<h1>{% trans "Import attendees" %}</h1>
|
||||
<form action="" method="post" class="form-horizontal" data-asynctask data-asynctask-long>
|
||||
{% csrf_token %}
|
||||
<div class="panel panel-default">
|
||||
<div class="panel-heading">
|
||||
<h3 class="panel-title">{% trans "Data preview" %}</h3>
|
||||
</div>
|
||||
<div class="table-responsive panel-body">
|
||||
<table class="table table-condensed">
|
||||
<thead>
|
||||
<tr>
|
||||
{% for fn in parsed.fieldnames %}
|
||||
<th>{{ fn }}</th>
|
||||
{% endfor %}
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for r in sample_rows %}
|
||||
<tr>
|
||||
{% for fn in parsed.fieldnames %}
|
||||
<td>{{ r|getitem:fn }}</td>
|
||||
{% endfor %}
|
||||
</tr>
|
||||
{% endfor %}
|
||||
<tr>
|
||||
<td class="text-center" colspan="{{ parsed.fieldnames|length }}">
|
||||
…
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
<div class="panel panel-default">
|
||||
<div class="panel-heading">
|
||||
<h3 class="panel-title">{% trans "Import settings" %}</h3>
|
||||
</div>
|
||||
<div class="panel-body">
|
||||
{% bootstrap_form_errors form %}
|
||||
{% bootstrap_form form layout="horizontal" %}
|
||||
<div class="alert alert-warning">
|
||||
{% blocktrans trimmed %}
|
||||
The import will be performed regardless of your quotas, so it will be possible to overbook your event using this option.
|
||||
{% endblocktrans %}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="form-group submit-group">
|
||||
<button type="submit" class="btn btn-primary btn-save">
|
||||
{% trans "Perform import" %}
|
||||
</button>
|
||||
</div>
|
||||
</form>
|
||||
{% endblock %}
|
||||
@@ -0,0 +1,31 @@
|
||||
{% extends "pretixcontrol/event/base.html" %}
|
||||
{% load i18n %}
|
||||
{% load static %}
|
||||
{% block title %}{% trans "Import attendees" %}{% endblock %}
|
||||
{% block content %}
|
||||
<h1>{% trans "Import attendees" %}</h1>
|
||||
|
||||
<div class="panel panel-default">
|
||||
<div class="panel-heading">
|
||||
<h3 class="panel-title">{% trans "Upload a new file" %}</h3>
|
||||
</div>
|
||||
<div class="panel-body">
|
||||
<form action="" method="post" enctype="multipart/form-data" class="form-inline">
|
||||
{% csrf_token %}
|
||||
<p>
|
||||
{% blocktrans trimmed %}
|
||||
The uploaded file should be a CSV file with a header row. You will be able to assign the
|
||||
meanings of the different columns in the next step.
|
||||
{% endblocktrans %}
|
||||
</p>
|
||||
<div class="form-group">
|
||||
<label for="file">{% trans "Import file" %}: </label> <input id="file" type="file" name="file"/>
|
||||
</div>
|
||||
<div class="clearfix"></div>
|
||||
<button class="btn btn-primary pull-right flip" type="submit">
|
||||
<span class="icon icon-upload"></span> {% trans "Start import" %}
|
||||
</button>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
{% endblock %}
|
||||
11
src/pretix/control/templatetags/getitem.py
Normal file
11
src/pretix/control/templatetags/getitem.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from django import template
|
||||
|
||||
register = template.Library()
|
||||
|
||||
|
||||
@register.filter(name='getitem')
|
||||
def getitem_filter(value, itemname):
|
||||
if not value:
|
||||
return ''
|
||||
|
||||
return value[itemname]
|
||||
@@ -2,8 +2,8 @@ from django.conf.urls import include, url
|
||||
|
||||
from pretix.control.views import (
|
||||
auth, checkin, dashboards, event, geo, global_settings, item, main, oauth,
|
||||
orders, organizer, pdf, search, shredder, subevents, typeahead, user,
|
||||
users, vouchers, waitinglist,
|
||||
orderimport, orders, organizer, pdf, search, shredder, subevents,
|
||||
typeahead, user, users, vouchers, waitinglist,
|
||||
)
|
||||
|
||||
urlpatterns = [
|
||||
@@ -257,6 +257,8 @@ urlpatterns = [
|
||||
url(r'^invoice/(?P<invoice>[^/]+)$', orders.InvoiceDownload.as_view(),
|
||||
name='event.invoice.download'),
|
||||
url(r'^orders/overview/$', orders.OverView.as_view(), name='event.orders.overview'),
|
||||
url(r'^orders/import/$', orderimport.ImportView.as_view(), name='event.orders.import'),
|
||||
url(r'^orders/import/(?P<file>[^/]+)/$', orderimport.ProcessView.as_view(), name='event.orders.import.process'),
|
||||
url(r'^orders/export/$', orders.ExportView.as_view(), name='event.orders.export'),
|
||||
url(r'^orders/export/do$', orders.ExportDoView.as_view(), name='event.orders.export.do'),
|
||||
url(r'^orders/refunds/$', orders.RefundList.as_view(), name='event.orders.refunds'),
|
||||
|
||||
125
src/pretix/control/views/orderimport.py
Normal file
125
src/pretix/control/views/orderimport.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib import messages
|
||||
from django.shortcuts import get_object_or_404, redirect
|
||||
from django.urls import reverse
|
||||
from django.utils.functional import cached_property
|
||||
from django.utils.timezone import now
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.views.generic import FormView, TemplateView
|
||||
|
||||
from pretix.base.models import CachedFile
|
||||
from pretix.base.services.orderimport import import_orders, parse_csv
|
||||
from pretix.base.views.tasks import AsyncAction
|
||||
from pretix.control.forms.orderimport import ProcessForm
|
||||
from pretix.control.permissions import EventPermissionRequiredMixin
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImportView(EventPermissionRequiredMixin, TemplateView):
|
||||
template_name = 'pretixcontrol/orders/import_start.html'
|
||||
permission = 'can_change_orders'
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
if 'file' not in request.FILES:
|
||||
return redirect(reverse('control:event.orders.import', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.organizer.slug,
|
||||
}))
|
||||
if not request.FILES['file'].name.endswith('.csv'):
|
||||
messages.error(request, _('Please only upload CSV files.'))
|
||||
return redirect(reverse('control:event.orders.import', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.organizer.slug,
|
||||
}))
|
||||
if request.FILES['file'].size > 1024 * 1024 * 10:
|
||||
messages.error(request, _('Please do not upload files larger than 10 MB.'))
|
||||
return redirect(reverse('control:event.orders.import', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.organizer.slug,
|
||||
}))
|
||||
|
||||
cf = CachedFile.objects.create(
|
||||
expires=now() + timedelta(days=1),
|
||||
date=now(),
|
||||
filename='import.csv',
|
||||
type='text/csv',
|
||||
)
|
||||
cf.file.save('import.csv', request.FILES['file'])
|
||||
return redirect(reverse('control:event.orders.import.process', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.organizer.slug,
|
||||
'file': cf.id
|
||||
}))
|
||||
|
||||
|
||||
class ProcessView(EventPermissionRequiredMixin, AsyncAction, FormView):
|
||||
permission = 'can_change_orders'
|
||||
template_name = 'pretixcontrol/orders/import_process.html'
|
||||
form_class = ProcessForm
|
||||
task = import_orders
|
||||
known_errortypes = ['DataImportError']
|
||||
|
||||
def get_form_kwargs(self):
|
||||
k = super().get_form_kwargs()
|
||||
k.update({
|
||||
'event': self.request.event,
|
||||
'initial': self.request.event.settings.order_import_settings,
|
||||
'headers': self.parsed.fieldnames
|
||||
})
|
||||
return k
|
||||
|
||||
def form_valid(self, form):
|
||||
self.request.event.settings.order_import_settings = form.cleaned_data
|
||||
return self.do(
|
||||
self.request.event.pk, self.file.id, form.cleaned_data, self.request.LANGUAGE_CODE,
|
||||
self.request.user.pk
|
||||
)
|
||||
|
||||
@cached_property
|
||||
def file(self):
|
||||
return get_object_or_404(CachedFile, pk=self.kwargs.get("file"), filename="import.csv")
|
||||
|
||||
@cached_property
|
||||
def parsed(self):
|
||||
return parse_csv(self.file.file, 1024 * 1024)
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
if 'async_id' in request.GET and settings.HAS_CELERY:
|
||||
return self.get_result(request)
|
||||
return FormView.get(self, request, *args, **kwargs)
|
||||
|
||||
def get_success_message(self, value):
|
||||
return _('The import was successful.')
|
||||
|
||||
def get_success_url(self, value):
|
||||
return reverse('control:event.orders', kwargs={
|
||||
'event': self.request.event.slug,
|
||||
'organizer': self.request.organizer.slug,
|
||||
})
|
||||
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not self.parsed:
|
||||
messages.error(request, _('We\'ve been unable to parse the uploaded file as a CSV file.'))
|
||||
return redirect(reverse('control:event.orders.import', kwargs={
|
||||
'event': request.event.slug,
|
||||
'organizer': request.organizer.slug,
|
||||
}))
|
||||
return super().dispatch(request, *args, **kwargs)
|
||||
|
||||
def get_error_url(self):
|
||||
return reverse('control:event.orders.import.process', kwargs={
|
||||
'event': self.request.event.slug,
|
||||
'organizer': self.request.organizer.slug,
|
||||
'file': self.file.id
|
||||
})
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
ctx = super().get_context_data(**kwargs)
|
||||
ctx['file'] = self.file
|
||||
ctx['parsed'] = self.parsed
|
||||
ctx['sample_rows'] = list(self.parsed)[:3]
|
||||
return ctx
|
||||
@@ -311,7 +311,7 @@ var form_handlers = function (el) {
|
||||
dependency.closest('.form-group').find('input[name=' + dependency.attr("name") + ']').on("dp.change", update);
|
||||
});
|
||||
|
||||
$("select[name$=state]").each(function () {
|
||||
$("select[name$=state]:not([data-static])").each(function () {
|
||||
var dependent = $(this),
|
||||
counter = 0,
|
||||
dependency = $(this).closest("form").find('select[name$=country]'),
|
||||
|
||||
@@ -2277,7 +2277,9 @@ def test_question_answer_validation_multiple_choice():
|
||||
assert q.clean_answer([str(o1.pk), str(o2.pk)]) == [o1, o2]
|
||||
assert q.clean_answer([str(o1.pk)]) == [o1]
|
||||
assert q.clean_answer([o1.pk]) == [o1]
|
||||
assert q.clean_answer([o1.pk, o3.pk]) == [o1]
|
||||
assert q.clean_answer([o1.pk, o3.pk + 1000]) == [o1]
|
||||
with pytest.raises(ValidationError):
|
||||
assert q.clean_answer([o1.pk, o3.pk]) == [o1]
|
||||
with pytest.raises(ValidationError):
|
||||
assert q.clean_answer([o1.pk, o3.pk + 1000]) == [o1]
|
||||
with pytest.raises(ValidationError):
|
||||
assert q.clean_answer([o1.pk, 'FOO']) == [o1]
|
||||
|
||||
688
src/tests/base/test_orderimport.py
Normal file
688
src/tests/base/test_orderimport.py
Normal file
@@ -0,0 +1,688 @@
|
||||
import csv
|
||||
from _decimal import Decimal
|
||||
from io import StringIO
|
||||
|
||||
import pytest
|
||||
from django.core.files.base import ContentFile
|
||||
from django.utils.timezone import now
|
||||
from django_scopes import scopes_disabled
|
||||
from i18nfield.strings import LazyI18nString
|
||||
|
||||
from pretix.base.models import (
|
||||
CachedFile, Event, Item, Order, OrderPayment, OrderPosition, Organizer,
|
||||
Question, QuestionAnswer, User,
|
||||
)
|
||||
from pretix.base.services.orderimport import DataImportError, import_orders
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def event():
|
||||
o = Organizer.objects.create(name='Dummy', slug='dummy')
|
||||
event = Event.objects.create(
|
||||
organizer=o, name='Dummy', slug='dummy',
|
||||
date_from=now(), plugins='pretix.plugins.banktransfer,pretix.plugins.paypal'
|
||||
)
|
||||
return event
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def item(event):
|
||||
return Item.objects.create(event=event, name="Ticket", default_price=23)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user():
|
||||
return User.objects.create_user('test@localhost', 'test')
|
||||
|
||||
|
||||
def inputfile_factory():
|
||||
d = [
|
||||
{
|
||||
'A': 'Dieter',
|
||||
'B': 'Schneider',
|
||||
'C': 'schneider@example.org',
|
||||
'D': 'Test',
|
||||
'E': 'Foo',
|
||||
'F': '0.00',
|
||||
'G': 'US',
|
||||
'H': 'Texas',
|
||||
'I': 'Foo',
|
||||
},
|
||||
{
|
||||
'A': 'Daniel',
|
||||
'B': 'Wulf',
|
||||
'C': 'daniel@example.org',
|
||||
'D': 'Test',
|
||||
'E': 'Bar',
|
||||
'F': '0.00',
|
||||
'G': 'DE',
|
||||
'H': '',
|
||||
'I': 'Bar',
|
||||
},
|
||||
{
|
||||
'A': 'Anke',
|
||||
'B': 'Müller',
|
||||
'C': '',
|
||||
'D': 'Test',
|
||||
'E': 'Baz',
|
||||
'F': '0.00',
|
||||
'G': 'AU',
|
||||
'H': '',
|
||||
'I': 'Foo,Bar',
|
||||
},
|
||||
]
|
||||
f = StringIO()
|
||||
w = csv.DictWriter(f, ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', "I"], dialect=csv.excel)
|
||||
w.writeheader()
|
||||
w.writerows(d)
|
||||
f.seek(0)
|
||||
c = CachedFile.objects.create(type="text/csv", filename="input.csv")
|
||||
c.file.save("input.csv", ContentFile(f.read()))
|
||||
return c
|
||||
|
||||
|
||||
DEFAULT_SETTINGS = {
|
||||
'orders': 'many',
|
||||
'testmode': False,
|
||||
'status': 'paid',
|
||||
'email': 'empty',
|
||||
'variation': 'empty',
|
||||
'invoice_address_company': 'empty',
|
||||
'invoice_address_name_full_name': 'empty',
|
||||
'invoice_address_street': 'empty',
|
||||
'invoice_address_zipcode': 'empty',
|
||||
'invoice_address_city': 'empty',
|
||||
'invoice_address_country': 'static:DE',
|
||||
'invoice_address_state': 'empty',
|
||||
'invoice_address_vat_id': 'empty',
|
||||
'invoice_address_internal_reference': 'empty',
|
||||
'attendee_name_full_name': 'empty',
|
||||
'attendee_email': 'empty',
|
||||
'price': 'empty',
|
||||
'secret': 'empty',
|
||||
'locale': 'static:en',
|
||||
'sales_channel': 'static:web',
|
||||
'comment': 'empty'
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_simple(event, item, user):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert event.orders.count() == 3
|
||||
assert OrderPosition.objects.count() == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_as_one_order(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['orders'] = 'one'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert event.orders.count() == 1
|
||||
o = event.orders.get()
|
||||
assert o.positions.count() == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_in_test_mode(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['testmode'] = True
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert event.orders.last().testmode
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_not_in_test_mode(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['testmode'] = False
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert not event.orders.last().testmode
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_pending(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['status'] = 'pending'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
o = event.orders.last()
|
||||
assert o.status == Order.STATUS_PENDING
|
||||
assert o.total == Decimal('23.00')
|
||||
assert not o.payments.exists()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_paid_generate_invoice(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['status'] = 'paid'
|
||||
event.settings.invoice_generate = 'paid'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
o = event.orders.last()
|
||||
assert o.status == Order.STATUS_PAID
|
||||
assert o.total == Decimal('23.00')
|
||||
p = o.payments.first()
|
||||
assert p.provider == 'manual'
|
||||
assert p.state == OrderPayment.PAYMENT_STATE_CONFIRMED
|
||||
assert o.invoices.count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_free(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['price'] = 'csv:F'
|
||||
settings['status'] = 'pending'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
o = event.orders.last()
|
||||
assert o.status == Order.STATUS_PAID
|
||||
assert o.total == Decimal('0.00')
|
||||
p = o.payments.first()
|
||||
assert p.provider == 'free'
|
||||
assert p.state == OrderPayment.PAYMENT_STATE_CONFIRMED
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_email(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['email'] = 'csv:C'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert event.orders.filter(email="schneider@example.org").exists()
|
||||
assert event.orders.filter(email="daniel@example.org").exists()
|
||||
assert event.orders.filter(email__isnull=True).count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_email_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['email'] = 'csv:A'
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Dieter" for column "E-mail address" in line "1": Enter a valid email address.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_attendee_email(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['attendee_email'] = 'csv:C'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert OrderPosition.objects.filter(attendee_email="schneider@example.org").exists()
|
||||
assert OrderPosition.objects.filter(attendee_email__isnull=True).count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_attendee_email_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['attendee_email'] = 'csv:A'
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Dieter" for column "Attendee e-mail address" in line "1": Enter a valid email address.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_product(user, event, item):
|
||||
i = Item.objects.create(
|
||||
event=event,
|
||||
name=LazyI18nString({'de': 'Foo', 'en': 'Bar'}),
|
||||
internal_name='Baz',
|
||||
default_price=23,
|
||||
)
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'csv:E'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert OrderPosition.objects.filter(item=i).count() == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_product_unknown(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'csv:A'
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Dieter" for column "Product" in line "1": No matching product was found.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_product_dupl(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'csv:E'
|
||||
Item.objects.create(
|
||||
event=event,
|
||||
name='Foo',
|
||||
default_price=23,
|
||||
)
|
||||
Item.objects.create(
|
||||
event=event,
|
||||
name='Foo',
|
||||
default_price=23,
|
||||
)
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Foo" for column "Product" in line "1": Multiple matching products were found.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_variation_required(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
item.variations.create(value='Default')
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "" for column "Product variation" in line "1": You need to select a variation for this product.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_variation_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['variation'] = 'csv:E'
|
||||
item.variations.create(value='Default')
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Foo" for column "Product variation" in line "1": No matching variation was found.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_variation_dynamic(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['variation'] = 'csv:E'
|
||||
v1 = item.variations.create(value='Foo')
|
||||
v2 = item.variations.create(value=LazyI18nString({'en': 'Bar'}))
|
||||
v3 = item.variations.create(value='Baz')
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert OrderPosition.objects.filter(variation=v1).count() == 1
|
||||
assert OrderPosition.objects.filter(variation=v2).count() == 1
|
||||
assert OrderPosition.objects.filter(variation=v3).count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_company(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['email'] = 'csv:C'
|
||||
settings['invoice_address_company'] = 'csv:C'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert event.orders.get(email='schneider@example.org').invoice_address.company == 'schneider@example.org'
|
||||
assert event.orders.get(email='schneider@example.org').invoice_address.is_business
|
||||
assert event.orders.get(email__isnull=True).invoice_address.company == ''
|
||||
assert not event.orders.get(email__isnull=True).invoice_address.is_business
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_name_parts(user, event, item):
|
||||
event.settings.name_scheme = 'given_family'
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['email'] = 'csv:C'
|
||||
settings['invoice_address_name_given_name'] = 'csv:A'
|
||||
settings['invoice_address_name_family_name'] = 'csv:B'
|
||||
settings['attendee_name_given_name'] = 'csv:A'
|
||||
settings['attendee_name_family_name'] = 'csv:B'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
o = event.orders.get(email='schneider@example.org')
|
||||
assert o.invoice_address.name_parts == {
|
||||
'_scheme': 'given_family',
|
||||
'given_name': 'Dieter',
|
||||
'family_name': 'Schneider'
|
||||
}
|
||||
assert o.invoice_address.name_cached == 'Dieter Schneider'
|
||||
assert o.positions.first().attendee_name_parts == {
|
||||
'_scheme': 'given_family',
|
||||
'given_name': 'Dieter',
|
||||
'family_name': 'Schneider'
|
||||
}
|
||||
assert o.positions.first().attendee_name_cached == 'Dieter Schneider'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_country(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['invoice_address_country'] = 'csv:G'
|
||||
settings['email'] = 'csv:C'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert str(event.orders.get(email='schneider@example.org').invoice_address.country) == 'US'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_country_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['invoice_address_country'] = 'csv:A'
|
||||
settings['email'] = 'csv:C'
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Dieter" for column "Invoice address: Country" in line "1": Please enter a valid country code.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_state(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['invoice_address_country'] = 'csv:G'
|
||||
settings['invoice_address_state'] = 'csv:H'
|
||||
settings['email'] = 'csv:C'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
)
|
||||
assert str(event.orders.get(email='schneider@example.org').invoice_address.country) == 'US'
|
||||
assert str(event.orders.get(email='schneider@example.org').invoice_address.state) == 'TX'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_state_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['invoice_address_country'] = 'static:AU'
|
||||
settings['invoice_address_state'] = 'csv:H'
|
||||
settings['email'] = 'csv:C'
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Texas" for column "Invoice address: State" in line "1": Please enter a valid state.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_saleschannel_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['sales_channel'] = 'csv:A'
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Dieter" for column "Sales channel" in line "1": Please enter a valid sales channel.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_locale_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['locale'] = 'static:de' # not enabled on this event
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "de" for column "Order locale" in line "1": Please enter a valid language code.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_price_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['price'] = 'csv:A'
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Dieter" for column "Price" in line "1": You ' \
|
||||
'entered an invalid number.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_secret(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['secret'] = 'csv:A'
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert OrderPosition.objects.filter(secret="Dieter").count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_secret_dupl(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['secret'] = 'csv:D'
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Test" for column "Ticket code" in line "2": You cannot assign a position ' \
|
||||
'secret that already exists.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_seat_required(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
|
||||
event.seat_category_mappings.create(
|
||||
layout_category='Stalls', product=item
|
||||
)
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "" for column "Seat ID" in line "1": You need to select ' \
|
||||
'a specific seat.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_seat_blocked(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['seat'] = 'csv:D'
|
||||
|
||||
event.seat_category_mappings.create(
|
||||
layout_category='Stalls', product=item
|
||||
)
|
||||
event.seats.create(name="Test", product=item, seat_guid="Test", blocked=True)
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Test" for column "Seat ID" in line "1": The seat you selected has already ' \
|
||||
'been taken. Please select a different seat.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_seat_dbl(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['seat'] = 'csv:D'
|
||||
|
||||
event.seat_category_mappings.create(
|
||||
layout_category='Stalls', product=item
|
||||
)
|
||||
event.seats.create(name="Test", product=item, seat_guid="Test")
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Test" for column "Seat ID" in line "2": The seat you selected has already ' \
|
||||
'been taken. Please select a different seat.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_seat(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['seat'] = 'csv:E'
|
||||
|
||||
event.seat_category_mappings.create(
|
||||
layout_category='Stalls', product=item
|
||||
)
|
||||
s1 = event.seats.create(name="Foo", product=item, seat_guid="Foo")
|
||||
s2 = event.seats.create(name="Bar", product=item, seat_guid="Bar")
|
||||
s3 = event.seats.create(name="Baz", product=item, seat_guid="Baz")
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert not s1.is_available()
|
||||
assert not s2.is_available()
|
||||
assert not s3.is_available()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_subevent_invalid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
event.has_subevents = True
|
||||
event.save()
|
||||
event.subevents.create(name='Foo', date_from=now(), active=True)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['subevent'] = 'csv:E'
|
||||
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Bar" for column "Date" in line "2": No matching date was found.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_subevent_required(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
event.has_subevents = True
|
||||
event.save()
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "" for column "Date" in line "1": You need to select a date.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_subevent(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
event.has_subevents = True
|
||||
event.save()
|
||||
s = event.subevents.create(name='Test', date_from=now(), active=True)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['subevent'] = 'csv:D'
|
||||
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert OrderPosition.objects.filter(subevent=s).count() == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_question_validate(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
q = event.questions.create(question='Foo', type=Question.TYPE_NUMBER)
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['question_{}'.format(q.pk)] = 'csv:D'
|
||||
|
||||
with pytest.raises(DataImportError) as excinfo:
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert 'Error while importing value "Test" for column "Question: Foo" in line "1": Invalid number input.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@scopes_disabled()
|
||||
def test_import_question_valid(user, event, item):
|
||||
settings = dict(DEFAULT_SETTINGS)
|
||||
q = event.questions.create(question='Foo', type=Question.TYPE_CHOICE_MULTIPLE)
|
||||
o1 = q.options.create(answer='Foo', identifier='Foo')
|
||||
o2 = q.options.create(answer='Bar', identifier='Bar')
|
||||
settings['item'] = 'static:{}'.format(item.pk)
|
||||
settings['attendee_email'] = 'csv:C'
|
||||
settings['question_{}'.format(q.pk)] = 'csv:I'
|
||||
|
||||
import_orders.apply(
|
||||
args=(event.pk, inputfile_factory().id, settings, 'en', user.pk)
|
||||
).get()
|
||||
assert QuestionAnswer.objects.filter(question=q).count() == 3
|
||||
a1 = OrderPosition.objects.get(attendee_email='schneider@example.org').answers.first()
|
||||
assert a1.question == q
|
||||
assert list(a1.options.all()) == [o1]
|
||||
a3 = OrderPosition.objects.get(attendee_email__isnull=True).answers.first()
|
||||
assert a3.question == q
|
||||
assert set(a3.options.all()) == {o1, o2}
|
||||
|
||||
# TODO: validate question
|
||||
46
src/tests/control/test_orderimport.py
Normal file
46
src/tests/control/test_orderimport.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import pytest
|
||||
from bs4 import BeautifulSoup
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.utils.timezone import now
|
||||
|
||||
from pretix.base.models import Event, Organizer, Team, User
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def env():
|
||||
o = Organizer.objects.create(name='Dummy', slug='dummy')
|
||||
event = Event.objects.create(
|
||||
organizer=o, name='Dummy', slug='dummy',
|
||||
date_from=now(), plugins='pretix.plugins.banktransfer,pretix.plugins.paypal'
|
||||
)
|
||||
user = User.objects.create_user('dummy@dummy.dummy', 'dummy')
|
||||
t = Team.objects.create(organizer=event.organizer, can_view_orders=True, can_change_orders=True)
|
||||
t.members.add(user)
|
||||
t.limit_events.add(event)
|
||||
return event, user
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_import_csv_file(client, env):
|
||||
client.login(email='dummy@dummy.dummy', password='dummy')
|
||||
r = client.get('/control/event/dummy/dummy/orders/import/')
|
||||
assert r.status_code == 200
|
||||
|
||||
file = SimpleUploadedFile('file.csv', """First name,Last name,Email
|
||||
Dieter,Schneider,schneider@example.org
|
||||
Daniel,Wulf,daniel@example.org
|
||||
Daniel,Wulf,daniel@example.org
|
||||
Anke,Müller,anke@example.net
|
||||
|
||||
""".encode("utf-8"), content_type="text/csv")
|
||||
|
||||
r = client.post('/control/event/dummy/dummy/orders/import/', {
|
||||
'file': file
|
||||
}, follow=True)
|
||||
doc = BeautifulSoup(r.content, "lxml")
|
||||
assert doc.select("select[name=orders]")
|
||||
assert doc.select("select[name=status]")
|
||||
assert doc.select("select[name=attendee_email]")
|
||||
assert b"Dieter" in r.content
|
||||
assert b"daniel@example.org" in r.content
|
||||
assert b"Anke" not in r.content
|
||||
@@ -115,6 +115,7 @@ event_urls = [
|
||||
"orders/ABC/delete",
|
||||
"orders/ABC/",
|
||||
"orders/",
|
||||
"orders/import/",
|
||||
"checkinlists/",
|
||||
"checkinlists/1/",
|
||||
"checkinlists/1/change",
|
||||
@@ -280,6 +281,8 @@ event_permission_urls = [
|
||||
("can_change_orders", "orders/FOO/delete", 302),
|
||||
("can_change_orders", "orders/FOO/comment", 405),
|
||||
("can_change_orders", "orders/FOO/locale", 200),
|
||||
("can_change_orders", "orders/import/", 200),
|
||||
("can_change_orders", "orders/import/0ab7b081-92d3-4480-82de-2f8b056fd32f/", 404),
|
||||
("can_view_orders", "orders/FOO/answer/5/", 404),
|
||||
("can_change_vouchers", "vouchers/add", 200),
|
||||
("can_change_orders", "requiredactions/", 200),
|
||||
|
||||
Reference in New Issue
Block a user