Files
pretix_cgo/src/pretix/base/modelimport_orders.py

773 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#
# This file is part of pretix (Community Edition).
#
# Copyright (C) 2014-2020 Raphael Michel and contributors
# Copyright (C) 2020-2021 rami.io GmbH and contributors
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation in version 3 of the License.
#
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
# this file, see <https://pretix.eu/about/en/license>.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import datetime
from collections import defaultdict
import pycountry
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import EmailValidator
from django.db.models import Q
from django.utils import formats
from django.utils.functional import cached_property
from django.utils.translation import (
gettext as _, gettext_lazy, pgettext, pgettext_lazy,
)
from django_countries import countries
from django_countries.fields import Country
from i18nfield.strings import LazyI18nString
from phonenumber_field.phonenumber import to_python
from phonenumbers import SUPPORTED_REGIONS
from pretix.base.forms.questions import guess_country
from pretix.base.modelimport import (
BooleanColumnMixin, DatetimeColumnMixin, DecimalColumnMixin, ImportColumn,
SubeventColumnMixin, i18n_flat,
)
from pretix.base.models import (
Customer, ItemVariation, OrderPosition, Question, QuestionAnswer,
QuestionOption, Seat,
)
from pretix.base.services.pricing import get_price
from pretix.base.settings import (
COUNTRIES_WITH_STATE_IN_ADDRESS, PERSON_NAME_SCHEMES,
)
from pretix.base.signals import order_import_columns
class EmailColumn(ImportColumn):
identifier = 'email'
verbose_name = gettext_lazy('E-mail address')
def clean(self, value, previous_values):
if value:
EmailValidator()(value)
return value
def assign(self, value, order, position, invoice_address, **kwargs):
order.email = value
class PhoneColumn(ImportColumn):
identifier = 'phone'
verbose_name = gettext_lazy('Phone number')
def clean(self, value, previous_values):
if value:
if self.event.settings.region in SUPPORTED_REGIONS:
region = self.event.settings.region
elif self.event.settings.locale[:2].upper() in SUPPORTED_REGIONS:
region = self.event.settings.locale[:2].upper()
else:
region = None
phone_number = to_python(value, region)
if not phone_number or not phone_number.is_valid():
raise ValidationError(_('Enter a valid phone number.'))
return phone_number
return value
def assign(self, value, order, position, invoice_address, **kwargs):
order.phone = value
class SubeventColumn(SubeventColumnMixin, ImportColumn):
identifier = 'subevent'
verbose_name = pgettext_lazy('subevents', 'Date')
default_value = None
def clean(self, value, previous_values):
if not value:
raise ValidationError(pgettext("subevent", "You need to select a date."))
return super().clean(value, previous_values)
def assign(self, value, order, position, invoice_address, **kwargs):
position.subevent = value
class ItemColumn(ImportColumn):
identifier = 'item'
verbose_name = gettext_lazy('Product')
default_value = None
@cached_property
def items(self):
return list(self.event.items.filter(active=True))
def static_choices(self):
return [
(str(p.pk), str(p)) for p in self.items
]
def clean(self, value, previous_values):
matches = [
p for p in self.items
if str(p.pk) == value or (p.internal_name and p.internal_name == value) or any(
(v and v == value) for v in i18n_flat(p.name))
]
if len(matches) == 0:
raise ValidationError(_("No matching product was found."))
if len(matches) > 1:
raise ValidationError(_("Multiple matching products were found."))
return matches[0]
def assign(self, value, order, position, invoice_address, **kwargs):
position.item = value
class Variation(ImportColumn):
identifier = 'variation'
verbose_name = gettext_lazy('Product variation')
@cached_property
def items(self):
return list(ItemVariation.objects.filter(
active=True, item__active=True, item__event=self.event
).select_related('item'))
def static_choices(self):
return [
(str(p.pk), '{} {}'.format(p.item, p.value)) for p in self.items
]
def clean(self, value, previous_values):
if value:
matches = [
p for p in self.items
if str(p.pk) == value or any((v and v == value) for v in i18n_flat(p.value)) and p.item_id == previous_values['item'].pk
]
if len(matches) == 0:
raise ValidationError(_("No matching variation was found."))
if len(matches) > 1:
raise ValidationError(_("Multiple matching variations were found."))
return matches[0]
elif previous_values['item'].variations.exists():
raise ValidationError(_("You need to select a variation for this product."))
return value
def assign(self, value, order, position, invoice_address, **kwargs):
position.variation = value
class InvoiceAddressCompany(ImportColumn):
identifier = 'invoice_address_company'
@property
def verbose_name(self):
return _('Invoice address') + ': ' + _('Company')
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.company = value or ''
invoice_address.is_business = bool(value)
class InvoiceAddressNamePart(ImportColumn):
def __init__(self, event, key, label):
self.key = key
self.label = label
super().__init__(event)
@property
def verbose_name(self):
return _('Invoice address') + ': ' + str(self.label)
@property
def identifier(self):
return 'invoice_address_name_{}'.format(self.key)
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.name_parts[self.key] = value or ''
class InvoiceAddressStreet(ImportColumn):
identifier = 'invoice_address_street'
@property
def verbose_name(self):
return _('Invoice address') + ': ' + _('Address')
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.street = value or ''
class InvoiceAddressZip(ImportColumn):
identifier = 'invoice_address_zipcode'
@property
def verbose_name(self):
return _('Invoice address') + ': ' + _('ZIP code')
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.zipcode = value or ''
class InvoiceAddressCity(ImportColumn):
identifier = 'invoice_address_city'
@property
def verbose_name(self):
return _('Invoice address') + ': ' + _('City')
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.city = value or ''
class InvoiceAddressCountry(ImportColumn):
identifier = 'invoice_address_country'
default_value = None
@property
def initial(self):
return 'static:' + str(guess_country(self.event))
@property
def verbose_name(self):
return _('Invoice address') + ': ' + _('Country')
def static_choices(self):
return list(countries)
def clean(self, value, previous_values):
if value and not (Country(value).numeric or value in settings.COUNTRIES_OVERRIDE):
raise ValidationError(_("Please enter a valid country code."))
return value
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.country = value or ''
class InvoiceAddressState(ImportColumn):
identifier = 'invoice_address_state'
@property
def verbose_name(self):
return _('Invoice address') + ': ' + pgettext('address', 'State')
def clean(self, value, previous_values):
if value:
if previous_values.get('invoice_address_country') not in COUNTRIES_WITH_STATE_IN_ADDRESS:
raise ValidationError(_("States are not supported for this country."))
types, form = COUNTRIES_WITH_STATE_IN_ADDRESS[previous_values.get('invoice_address_country')]
match = [
s for s in pycountry.subdivisions.get(country_code=previous_values.get('invoice_address_country'))
if s.type in types and (s.code[3:] == value or s.name == value)
]
if len(match) == 0:
raise ValidationError(_("Please enter a valid state."))
return match[0].code[3:]
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.state = value or ''
class InvoiceAddressVATID(ImportColumn):
identifier = 'invoice_address_vat_id'
@property
def verbose_name(self):
return _('Invoice address') + ': ' + _('VAT ID')
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.vat_id = value or ''
class InvoiceAddressReference(ImportColumn):
identifier = 'invoice_address_internal_reference'
@property
def verbose_name(self):
return _('Invoice address') + ': ' + _('Internal reference')
def assign(self, value, order, position, invoice_address, **kwargs):
invoice_address.internal_reference = value or ''
class AttendeeNamePart(ImportColumn):
def __init__(self, event, key, label):
self.key = key
self.label = label
super().__init__(event)
@property
def verbose_name(self):
return _('Attendee name') + ': ' + str(self.label)
@property
def identifier(self):
return 'attendee_name_{}'.format(self.key)
def assign(self, value, order, position, invoice_address, **kwargs):
position.attendee_name_parts[self.key] = value or ''
class AttendeeEmail(ImportColumn):
identifier = 'attendee_email'
verbose_name = gettext_lazy('Attendee e-mail address')
def clean(self, value, previous_values):
if value:
EmailValidator()(value)
return value
def assign(self, value, order, position, invoice_address, **kwargs):
position.attendee_email = value
class AttendeeCompany(ImportColumn):
identifier = 'attendee_company'
@property
def verbose_name(self):
return _('Attendee address') + ': ' + _('Company')
def assign(self, value, order, position, invoice_address, **kwargs):
position.company = value or ''
class AttendeeStreet(ImportColumn):
identifier = 'attendee_street'
@property
def verbose_name(self):
return _('Attendee address') + ': ' + _('Address')
def assign(self, value, order, position, invoice_address, **kwargs):
position.street = value or ''
class AttendeeZip(ImportColumn):
identifier = 'attendee_zipcode'
@property
def verbose_name(self):
return _('Attendee address') + ': ' + _('ZIP code')
def assign(self, value, order, position, invoice_address, **kwargs):
position.zipcode = value or ''
class AttendeeCity(ImportColumn):
identifier = 'attendee_city'
@property
def verbose_name(self):
return _('Attendee address') + ': ' + _('City')
def assign(self, value, order, position, invoice_address, **kwargs):
position.city = value or ''
class AttendeeCountry(ImportColumn):
identifier = 'attendee_country'
default_value = None
@property
def initial(self):
return 'static:' + str(guess_country(self.event))
@property
def verbose_name(self):
return _('Attendee address') + ': ' + _('Country')
def static_choices(self):
return list(countries)
def clean(self, value, previous_values):
if value and not (Country(value).numeric or value in settings.COUNTRIES_OVERRIDE):
raise ValidationError(_("Please enter a valid country code."))
return value
def assign(self, value, order, position, invoice_address, **kwargs):
position.country = value or ''
class AttendeeState(ImportColumn):
identifier = 'attendee_state'
@property
def verbose_name(self):
return _('Attendee address') + ': ' + _('State')
def clean(self, value, previous_values):
if value:
if previous_values.get('attendee_country') not in COUNTRIES_WITH_STATE_IN_ADDRESS:
raise ValidationError(_("States are not supported for this country."))
types, form = COUNTRIES_WITH_STATE_IN_ADDRESS[previous_values.get('attendee_country')]
match = [
s for s in pycountry.subdivisions.get(country_code=previous_values.get('attendee_country'))
if s.type in types and (s.code[3:] == value or s.name == value)
]
if len(match) == 0:
raise ValidationError(_("Please enter a valid state."))
return match[0].code[3:]
def assign(self, value, order, position, invoice_address, **kwargs):
position.state = value or ''
class Price(DecimalColumnMixin, ImportColumn):
identifier = 'price'
verbose_name = gettext_lazy('Price')
default_label = gettext_lazy('Calculate from product')
def assign(self, value, order, position, invoice_address, **kwargs):
if value is None:
p = get_price(position.item, position.variation, position.voucher, subevent=position.subevent,
invoice_address=invoice_address)
else:
p = get_price(position.item, position.variation, position.voucher, subevent=position.subevent,
invoice_address=invoice_address, custom_price=value, force_custom_price=True)
position.price = p.gross
position.tax_rule = position.item.tax_rule
position.tax_rate = p.rate
position.tax_value = p.tax
class Secret(ImportColumn):
identifier = 'secret'
verbose_name = gettext_lazy('Ticket code')
default_label = gettext_lazy('Generate automatically')
def __init__(self, *args):
self._cached = set()
super().__init__(*args)
def clean(self, value, previous_values):
if value and (value in self._cached or OrderPosition.all.filter(order__event__organizer=self.event.organizer, secret=value).exists()):
raise ValidationError(
_('You cannot assign a position secret that already exists.')
)
self._cached.add(value)
return value
def assign(self, value, order, position, invoice_address, **kwargs):
if value:
position.secret = value
class Locale(ImportColumn):
identifier = 'locale'
verbose_name = gettext_lazy('Order locale')
default_value = None
@property
def initial(self):
return 'static:' + self.event.settings.locale
def static_choices(self):
locale_names = dict(settings.LANGUAGES)
return [
(a, locale_names[a]) for a in self.event.settings.locales
]
def clean(self, value, previous_values):
if not value:
value = self.event.settings.locale
if isinstance(value, str):
value = value.lower()
if value not in self.event.settings.locales:
raise ValidationError(_("Please enter a valid language code."))
return value
def assign(self, value, order, position, invoice_address, **kwargs):
order.locale = value
class ValidFrom(DatetimeColumnMixin, ImportColumn):
identifier = 'valid_from'
verbose_name = gettext_lazy('Valid from')
def assign(self, value, order, position, invoice_address, **kwargs):
position.valid_from = value
class ValidUntil(DatetimeColumnMixin, ImportColumn):
identifier = 'valid_until'
verbose_name = gettext_lazy('Valid until')
def assign(self, value, order, position, invoice_address, **kwargs):
position.valid_until = value
class Expires(DatetimeColumnMixin, ImportColumn):
identifier = 'expires'
verbose_name = gettext_lazy('Expiry date')
def clean(self, value, previous_values):
if not value:
return
input_formats = formats.get_format('DATE_INPUT_FORMATS', use_l10n=True)
for format in input_formats:
try:
d = datetime.datetime.strptime(value, format)
d = d.replace(tzinfo=self.timezone, hour=23, minute=59, second=59)
return d
except (ValueError, TypeError):
pass
else:
return super().clean(value, previous_values) # parse date
def assign(self, value, order, position, invoice_address, **kwargs):
if value:
order.expires = value
class Saleschannel(ImportColumn):
identifier = 'sales_channel'
verbose_name = gettext_lazy('Sales channel')
default_value = None
initial = 'static:web'
@cached_property
def channels(self):
return list(self.event.organizer.sales_channels.all())
def static_choices(self):
return [
(c.identifier, str(c.label)) for c in self.channels
]
def clean(self, value, previous_values):
matches = [
p for p in self.channels
if p.identifier == value or any((v and v == value) for v in i18n_flat(p.label))
]
if len(matches) == 0:
raise ValidationError(_("Please enter a valid sales channel."))
if len(matches) > 1:
raise ValidationError(_("Please enter a valid sales channel."))
return matches[0]
def assign(self, value, order, position, invoice_address, **kwargs):
order.sales_channel = value
class SeatColumn(ImportColumn):
identifier = 'seat'
verbose_name = gettext_lazy('Seat ID')
def __init__(self, *args):
self._cached = set()
super().__init__(*args)
def clean(self, value, previous_values):
if value:
try:
value = Seat.objects.get(
event=self.event,
seat_guid=value,
subevent=previous_values.get('subevent')
)
except Seat.MultipleObjectsReturned:
raise ValidationError(_('Multiple matching seats were found.'))
except Seat.DoesNotExist:
raise ValidationError(_('No matching seat was found.'))
if not value.is_available() or value in self._cached:
raise ValidationError(
_('The seat you selected has already been taken. Please select a different seat.'))
self._cached.add(value)
elif previous_values['item'].seat_category_mappings.filter(subevent=previous_values.get('subevent')).exists():
raise ValidationError(_('You need to select a specific seat.'))
return value
def assign(self, value, order, position, invoice_address, **kwargs):
position.seat = value
class Comment(ImportColumn):
identifier = 'comment'
verbose_name = gettext_lazy('Comment')
def assign(self, value, order, position, invoice_address, **kwargs):
order.comment = value or ''
class CheckinAttentionColumn(BooleanColumnMixin, ImportColumn):
identifier = 'checkin_attention'
verbose_name = gettext_lazy('Requires special attention')
def assign(self, value, order, position, invoice_address, **kwargs):
order.checkin_attention = value
class CheckinTextColumn(ImportColumn):
identifier = 'checkin_text'
verbose_name = gettext_lazy('Check-in text')
def assign(self, value, order, position, invoice_address, **kwargs):
order.checkin_text = value
class QuestionColumn(ImportColumn):
def __init__(self, event, q):
self.q = q
self.option_resolve_cache = defaultdict(set)
for opt in q.options.all():
self.option_resolve_cache[str(opt.id)].add(opt)
self.option_resolve_cache[opt.identifier].add(opt)
if isinstance(opt.answer, LazyI18nString):
if isinstance(opt.answer.data, dict):
for v in opt.answer.data.values():
self.option_resolve_cache[v.strip()].add(opt)
else:
self.option_resolve_cache[opt.answer.data.strip()].add(opt)
else:
self.option_resolve_cache[opt.answer.strip()].add(opt)
super().__init__(event)
@property
def verbose_name(self):
return _('Question') + ': ' + str(self.q.question)
@property
def identifier(self):
return 'question_{}'.format(self.q.pk)
def clean(self, value, previous_values):
if value:
if self.q.type == Question.TYPE_CHOICE:
if value not in self.option_resolve_cache:
raise ValidationError(_('Invalid option selected.'))
if len(self.option_resolve_cache[value]) > 1:
raise ValidationError(_('Ambiguous option selected.'))
return list(self.option_resolve_cache[value])[0]
elif self.q.type == Question.TYPE_CHOICE_MULTIPLE:
values = value.split(',')
if any(v.strip() not in self.option_resolve_cache for v in values):
raise ValidationError(_('Invalid option selected.'))
if any(len(self.option_resolve_cache[v.strip()]) > 1 for v in values):
raise ValidationError(_('Ambiguous option selected.'))
return [list(self.option_resolve_cache[v.strip()])[0] for v in values]
else:
return self.q.clean_answer(value)
def assign(self, value, order, position, invoice_address, **kwargs):
if value is not None:
if not hasattr(order, '_answers'):
order._answers = []
if isinstance(value, QuestionOption):
a = QuestionAnswer(orderposition=position, question=self.q, answer=str(value))
a._options = [value]
order._answers.append(a)
elif isinstance(value, list):
a = QuestionAnswer(orderposition=position, question=self.q, answer=', '.join(str(v) for v in value))
a._options = value
order._answers.append(a)
else:
order._answers.append(QuestionAnswer(question=self.q, answer=str(value), orderposition=position))
def save(self, order):
for a in getattr(order, '_answers', []):
a.orderposition = a.orderposition # This is apparently required after save() again
a.save()
if hasattr(a, '_options'):
a.options.add(*a._options)
class CustomerColumn(ImportColumn):
identifier = 'customer'
verbose_name = gettext_lazy('Customer')
def clean(self, value, previous_values):
if value:
try:
value = self.event.organizer.customers.get(
Q(identifier=value) | Q(email__iexact=value) | Q(external_identifier=value)
)
except Customer.MultipleObjectsReturned:
value = self.event.organizer.customers.get(
Q(identifier=value)
)
except Customer.DoesNotExist:
raise ValidationError(_('No matching customer was found.'))
return value
def assign(self, value, order, position, invoice_address, **kwargs):
order.customer = value
def get_order_import_columns(event):
default = []
if event.has_subevents:
default.append(SubeventColumn(event))
default += [
EmailColumn(event),
PhoneColumn(event),
ItemColumn(event),
Variation(event),
InvoiceAddressCompany(event),
]
if event.settings.customer_accounts:
default += [
CustomerColumn(event),
]
scheme = PERSON_NAME_SCHEMES.get(event.settings.name_scheme)
for n, l, w in scheme['fields']:
default.append(InvoiceAddressNamePart(event, n, l))
default += [
InvoiceAddressStreet(event),
InvoiceAddressZip(event),
InvoiceAddressCity(event),
InvoiceAddressCountry(event),
InvoiceAddressState(event),
InvoiceAddressVATID(event),
InvoiceAddressReference(event),
]
for n, l, w in scheme['fields']:
default.append(AttendeeNamePart(event, n, l))
default += [
AttendeeEmail(event),
AttendeeCompany(event),
AttendeeStreet(event),
AttendeeZip(event),
AttendeeCity(event),
AttendeeCountry(event),
AttendeeState(event),
Price(event),
Secret(event),
SeatColumn(event),
ValidFrom(event),
ValidUntil(event),
Locale(event),
Saleschannel(event),
CheckinAttentionColumn(event),
CheckinTextColumn(event),
Expires(event),
Comment(event),
]
for q in event.questions.prefetch_related('options').exclude(type=Question.TYPE_FILE):
default.append(QuestionColumn(event, q))
for recv, resp in order_import_columns.send(sender=event):
default += resp
return default