Files
pretix_original/src/pretix/base/models/organizer.py
Raphael Michel df0b580dd6 Pluggable permissions (#5728)
* Data model draft

* Refactor query and assignment usages of old permissions

* Backend UI

* API serializer

* Big string replace

* Docs, tests and fixes for teams api

* Update docs for device auth

* Eliminate old names

* Make tests pass

* Use new permissions, remove inconsistencies

* Add test for translations

* Show plugin permissions

* Add permission for seating plans

* Fix plugin activation

* Fix failing test

* Refactor to permission groups

* Update doc/api/resources/devices.rst

Co-authored-by: luelista <weller@rami.io>

* Update doc/api/resources/events.rst

Co-authored-by: luelista <weller@rami.io>

* Update src/pretix/api/serializers/organizer.py

Co-authored-by: luelista <weller@rami.io>

* Fix typo

* Fix python version compat

* Replacement after rebase

* Add proper permission handling for exports

* Docs for exporters

* Runtime linting of permission names

* Fix typos

* Show export page even without orders permission

* More legacy compat

* Do not strongly validate before plugins are loaded

* Rebase migration

* Add permission for outgoing mails

* Review notes

* Update doc/api/resources/teams.rst

Co-authored-by: Richard Schreiber <schreiber@pretix.eu>

* Clean up logic around exporters

* Review and failures

* Fix migration leading to forbidden combination

* Handle permissions on event copying

* Remove print-statements

* Make test clearer

* Review feedback

* Add AnyPermissionOf

* migration safety

---------

Co-authored-by: luelista <weller@rami.io>
Co-authored-by: Richard Schreiber <schreiber@pretix.eu>
2026-03-17 14:43:56 +01:00

688 lines
27 KiB
Python

#
# This file is part of pretix (Community Edition).
#
# Copyright (C) 2014-2020 Raphael Michel and contributors
# Copyright (C) 2020-today pretix GmbH and contributors
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation in version 3 of the License.
#
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
# this file, see <https://pretix.eu/about/en/license>.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
# This file is based on an earlier version of pretix which was released under the Apache License 2.0. The full text of
# the Apache License 2.0 can be obtained at <http://www.apache.org/licenses/LICENSE-2.0>.
#
# This file may have since been changed and any changes are released under the terms of AGPLv3 as described above. A
# full history of changes and contributors is available at <https://github.com/pretix/pretix>.
#
# This file contains Apache-licensed contributions copyrighted by: Christopher Dambamuromo, Sohalt, Tobias Kunze
#
# Unless required by applicable law or agreed to in writing, software distributed under the Apache License 2.0 is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under the License.
import operator
import string
from datetime import date, datetime, time
from functools import reduce
import pytz_deprecation_shim
from django.conf import settings
from django.core.mail import get_connection
from django.core.validators import MinLengthValidator, RegexValidator
from django.db import models
from django.db.models import Q
from django.urls import reverse
from django.utils.crypto import get_random_string
from django.utils.functional import cached_property
from django.utils.timezone import get_current_timezone, make_aware, now
from django.utils.translation import gettext_lazy as _
from django_scopes import ScopedManager, scope
from i18nfield.fields import I18nCharField
from i18nfield.strings import LazyI18nString
from pretix.base.models.base import LoggedModel
from pretix.base.validators import OrganizerSlugBanlistValidator
from ...helpers.permission_migration import (
OLD_TO_NEW_EVENT_COMPAT, OLD_TO_NEW_ORGANIZER_COMPAT,
LegacyPermissionProperty,
)
from ..settings import settings_hierarkey
from .auth import User
@settings_hierarkey.add(cache_namespace='organizer')
class Organizer(LoggedModel):
"""
This model represents an entity organizing events, e.g. a company, institution,
charity, person, …
:param name: The organizer's name
:type name: str
:param slug: A globally unique, short name for this organizer, to be used
in URLs and similar places.
:type slug: str
:param plugins: A comma-separated list of plugin names that are active for this organizer.
:type plugins: str
"""
settings_namespace = 'organizer'
name = models.CharField(max_length=200,
verbose_name=_("Name"))
slug = models.CharField(
max_length=50, db_index=True,
help_text=_(
"Should be short, only contain lowercase letters, numbers, dots, and dashes. Every slug can only be used "
"once. This is being used in URLs to refer to your organizer accounts and your events."),
validators=[
MinLengthValidator(
limit_value=2,
),
RegexValidator(
regex="^[a-zA-Z0-9][a-zA-Z0-9.-]*[a-zA-Z0-9]$",
message=_("The slug may only contain letters, numbers, dots and dashes.")
),
OrganizerSlugBanlistValidator()
],
verbose_name=_("Short form"),
unique=True
)
plugins = models.TextField(
verbose_name=_("Plugins"),
null=False, blank=True, default="",
)
class Meta:
verbose_name = _("Organizer")
verbose_name_plural = _("Organizers")
ordering = ("name", "slug")
def __str__(self) -> str:
return self.name
def save(self, *args, **kwargs):
is_new = not self.pk
obj = super().save(*args, **kwargs)
if is_new:
kwargs.pop('update_fields', None) # does not make sense here
self.set_defaults()
with scope(organizer=self):
self.create_default_sales_channels()
else:
self.get_cache().clear()
return obj
def set_defaults(self):
"""
This will be called after organizer creation.
This way, we can use this to introduce new default settings to pretix that do not affect existing organizers.
"""
self.settings.cookie_consent = True
plugins = [p for p in settings.PRETIX_PLUGINS_ORGANIZER_DEFAULT.split(",") if p]
if plugins and not self.get_plugins():
self.set_active_plugins(plugins, allow_restricted=plugins)
self.save()
def get_cache(self):
"""
Returns an :py:class:`ObjectRelatedCache` object. This behaves equivalent to
Django's built-in cache backends, but puts you into an isolated environment for
this organizer, so you don't have to prefix your cache keys. In addition, the cache
is being cleared every time the organizer changes.
.. deprecated:: 1.9
Use the property ``cache`` instead.
"""
return self.cache
@cached_property
def cache(self):
"""
Returns an :py:class:`ObjectRelatedCache` object. This behaves equivalent to
Django's built-in cache backends, but puts you into an isolated environment for
this organizer, so you don't have to prefix your cache keys. In addition, the cache
is being cleared every time the organizer changes.
"""
from pretix.base.cache import ObjectRelatedCache
return ObjectRelatedCache(self)
def get_plugins(self):
"""
Returns the names of the plugins activated for this organizer as a list.
"""
if not self.plugins:
return []
return self.plugins.split(",")
def get_available_plugins(self):
from pretix.base.plugins import get_all_plugins
return {
p.module: p for p in get_all_plugins(organizer=self)
if not p.name.startswith('.') and getattr(p, 'visible', True)
}
def set_active_plugins(self, modules, allow_restricted=frozenset()):
plugins_active = self.get_plugins()
plugins_available = self.get_available_plugins()
enable = [m for m in modules if m not in plugins_active and m in plugins_available]
for module in enable:
if getattr(plugins_available[module].app, 'restricted', False) and module not in allow_restricted:
modules.remove(module)
elif hasattr(plugins_available[module].app, 'installed'):
getattr(plugins_available[module].app, 'installed')(self)
self.plugins = ",".join(modules)
def enable_plugin(self, module, allow_restricted=frozenset()):
"""
Adds a plugin to the list of plugins, calling its ``installed`` hook (if available).
It is the caller's responsibility to save the organizer object.
"""
plugins_active = self.get_plugins()
if module not in plugins_active:
plugins_active.append(module)
self.set_active_plugins(plugins_active, allow_restricted=allow_restricted)
def disable_plugin(self, module):
"""
Removes a plugin from the list of plugins, calling its ``uninstalled`` hook (if available).
It is the caller's responsibility to save the organizer object and, in case of a hybrid organizer-event plugin,
to remove it from all events.
"""
plugins_active = self.get_plugins()
if module in plugins_active:
plugins_active.remove(module)
self.set_active_plugins(plugins_active)
plugins_available = self.get_available_plugins()
if module in plugins_available and hasattr(plugins_available[module].app, 'uninstalled'):
getattr(plugins_available[module].app, 'uninstalled')(self)
@property
def timezone(self):
return pytz_deprecation_shim.timezone(self.settings.timezone)
@cached_property
def all_logentries_link(self):
return reverse(
'control:organizer.log',
kwargs={
'organizer': self.slug,
}
)
@property
def has_gift_cards(self):
return self.cache.get_or_set(
key='has_gift_cards',
timeout=15,
default=lambda: self.issued_gift_cards.exists() or self.gift_card_issuer_acceptance.filter(active=True).exists()
)
@property
def accepted_gift_cards(self):
from .giftcards import GiftCard, GiftCardAcceptance
return GiftCard.objects.filter(
Q(issuer=self) |
Q(issuer__in=GiftCardAcceptance.objects.filter(
acceptor=self,
active=True,
).values_list('issuer', flat=True))
)
@property
def default_gift_card_expiry(self):
if self.settings.giftcard_expiry_years is not None:
tz = get_current_timezone()
return make_aware(datetime.combine(
date(now().astimezone(tz).year + self.settings.get('giftcard_expiry_years', as_type=int), 12, 31),
time(hour=23, minute=59, second=59)
), tz)
def allow_delete(self):
from . import Invoice, Order
return (
not Order.objects.filter(event__organizer=self).exists() and
not Invoice.objects.filter(event__organizer=self).exists() and
not self.devices.exists()
)
def delete_sub_objects(self):
for e in self.events.all():
e.delete_sub_objects()
e.delete()
self.teams.all().delete()
def get_mail_backend(self, timeout=None):
"""
Returns an email server connection, either by using the system-wide connection
or by returning a custom one based on the organizer's settings.
"""
if self.settings.smtp_use_custom:
return get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host=self.settings.smtp_host,
port=self.settings.smtp_port,
username=self.settings.smtp_username,
password=self.settings.smtp_password,
use_tls=self.settings.smtp_use_tls,
use_ssl=self.settings.smtp_use_ssl,
fail_silently=False, timeout=timeout)
else:
return get_connection(fail_silently=False)
def create_default_sales_channels(self):
from pretix.base.channels import get_all_sales_channel_types
i = 0
for channel in get_all_sales_channel_types().values():
if not channel.default_created:
continue
self.sales_channels.get_or_create(
identifier=channel.identifier,
defaults={
'label': LazyI18nString.from_gettext(channel.verbose_name),
'type': channel.identifier,
},
position=i
)
i += 1
def generate_invite_token():
return get_random_string(length=32, allowed_chars=string.ascii_lowercase + string.digits)
def generate_api_token():
return get_random_string(length=64, allowed_chars=string.ascii_lowercase + string.digits)
class TeamQuerySet(models.QuerySet):
@classmethod
def event_permission_q(cls, perm_name):
from ..permissions import assert_valid_event_permission
if perm_name.startswith('can_') and perm_name in OLD_TO_NEW_EVENT_COMPAT: # legacy
return reduce(operator.and_, [cls.event_permission_q(p) for p in OLD_TO_NEW_EVENT_COMPAT[perm_name]])
assert_valid_event_permission(perm_name, allow_legacy=False)
return (
Q(all_event_permissions=True) |
Q(**{f'limit_event_permissions__{perm_name}': True})
)
@classmethod
def organizer_permission_q(cls, perm_name):
from ..permissions import assert_valid_organizer_permission
if perm_name.startswith('can_') and perm_name in OLD_TO_NEW_ORGANIZER_COMPAT: # legacy
return reduce(operator.and_, [cls.organizer_permission_q(p) for p in OLD_TO_NEW_ORGANIZER_COMPAT[perm_name]])
assert_valid_organizer_permission(perm_name, allow_legacy=False)
return (
Q(all_organizer_permissions=True) |
Q(**{f'limit_organizer_permissions__{perm_name}': True})
)
def with_event_permission(self, perm_name):
return self.filter(self.event_permission_q(perm_name))
def with_organizer_permission(self, perm_name):
return self.filter(self.organizer_permission_q(perm_name))
class Team(LoggedModel):
"""
A team is a collection of people given certain access rights to one or more events of an organizer.
:param name: The name of this team
:type name: str
:param organizer: The organizer this team belongs to
:type organizer: Organizer
:param members: A set of users who belong to this team
:param all_events: Whether this team has access to all events of this organizer
:type all_events: bool
:param limit_events: A set of events this team has access to. Irrelevant if ``all_events`` is ``True``.
"""
organizer = models.ForeignKey(Organizer, related_name="teams", on_delete=models.CASCADE)
name = models.CharField(max_length=190, verbose_name=_("Team name"))
members = models.ManyToManyField(User, related_name="teams", verbose_name=_("Team members"))
require_2fa = models.BooleanField(
default=False, verbose_name=_("Require all members of this team to use two-factor authentication"),
help_text=_("If you turn this on, all members of the team will be required to either set up two-factor "
"authentication or leave the team. The setting may take a few minutes to become effective for "
"all users.")
)
# Scope
all_events = models.BooleanField(default=False, verbose_name=_("All events (including newly created ones)"))
limit_events = models.ManyToManyField('Event', verbose_name=_("Limit to events"), blank=True)
# Permissions
# We store them as {key: True} instead of [key] because otherwise not all lookups we need are supported on SQLite
all_event_permissions = models.BooleanField(default=False, verbose_name=_("All event permissions"))
limit_event_permissions = models.JSONField(default=dict, verbose_name=_("Event permissions"))
all_organizer_permissions = models.BooleanField(default=False, verbose_name=_("All organizer permissions"))
limit_organizer_permissions = models.JSONField(default=dict, verbose_name=_("Organizer permissions"))
# Legacy lookups for plugin compatibility
can_change_event_settings = LegacyPermissionProperty()
can_change_items = LegacyPermissionProperty()
can_view_orders = LegacyPermissionProperty()
can_change_orders = LegacyPermissionProperty()
can_checkin_orders = LegacyPermissionProperty()
can_view_vouchers = LegacyPermissionProperty()
can_change_vouchers = LegacyPermissionProperty()
can_create_events = LegacyPermissionProperty()
can_change_organizer_settings = LegacyPermissionProperty()
can_change_teams = LegacyPermissionProperty()
can_manage_gift_cards = LegacyPermissionProperty()
can_manage_customers = LegacyPermissionProperty()
can_manage_reusable_media = LegacyPermissionProperty()
objects = TeamQuerySet.as_manager()
def __str__(self) -> str:
return _("%(name)s on %(object)s") % {
'name': str(self.name),
'object': str(self.organizer),
}
def event_permission_set(self, include_legacy=True) -> set:
from ..permissions import get_all_event_permission_groups
result = set()
for pg in get_all_event_permission_groups().values():
for action in pg.actions:
if self.all_event_permissions or self.limit_event_permissions.get(f"{pg.name}:{action}"):
result.add(f"{pg.name}:{action}")
if include_legacy:
# Add legacy permissions as well for plugin compatibility
for k, v in OLD_TO_NEW_EVENT_COMPAT.items():
if self.all_event_permissions or all(self.limit_event_permissions.get(kk) for kk in v):
result.add(k)
if "can_change_event_settings" in result:
result.add("can_change_settings")
return result
def organizer_permission_set(self, include_legacy=True) -> set:
from ..permissions import get_all_organizer_permission_groups
result = set()
for pg in get_all_organizer_permission_groups().values():
for action in pg.actions:
if self.all_organizer_permissions or self.limit_organizer_permissions.get(f"{pg.name}:{action}"):
result.add(f"{pg.name}:{action}")
if include_legacy:
# Add legacy permissions as well for plugin compatibility
for k, v in OLD_TO_NEW_ORGANIZER_COMPAT.items():
if self.all_organizer_permissions or all(self.limit_organizer_permissions.get(kk) for kk in v):
result.add(k)
return result
@property
def can_change_settings(self): # Legacy compatibility
return self.can_change_event_settings
def has_event_permission(self, perm_name):
from ..permissions import assert_valid_event_permission
if perm_name.startswith('can_') and hasattr(self, perm_name): # legacy
return getattr(self, perm_name)
assert_valid_event_permission(perm_name, allow_legacy=False)
return self.all_event_permissions or self.limit_event_permissions.get(perm_name, False)
def has_organizer_permission(self, perm_name):
from ..permissions import assert_valid_organizer_permission
if perm_name.startswith('can_') and hasattr(self, perm_name): # legacy
return getattr(self, perm_name)
assert_valid_organizer_permission(perm_name, allow_legacy=False)
return self.all_organizer_permissions or self.limit_organizer_permissions.get(perm_name, False)
def permission_for_event(self, event):
if self.all_events:
return event.organizer_id == self.organizer_id
else:
return self.limit_events.filter(pk=event.pk).exists()
@property
def active_tokens(self):
return self.tokens.filter(active=True)
def save(self, **kwargs):
if not isinstance(self.limit_event_permissions, dict):
raise TypeError("Permissions must be a dictionary")
if not isinstance(self.limit_organizer_permissions, dict):
raise TypeError("Permissions must be a dictionary")
for k in self.limit_event_permissions.values():
if k is not True:
raise TypeError("Permissions must only contain True values")
for k in self.limit_organizer_permissions.values():
if k is not True:
raise TypeError("Permissions must only contain True values")
return super().save(**kwargs)
class Meta:
verbose_name = _("Team")
verbose_name_plural = _("Teams")
class TeamInvite(models.Model):
"""
A TeamInvite represents someone who has been invited to a team but hasn't accept the invitation
yet.
:param team: The team the person is invited to
:type team: Team
:param email: The email the invite has been sent to
:type email: str
:param token: The secret required to redeem the invite
:type token: str
"""
team = models.ForeignKey(Team, related_name="invites", on_delete=models.CASCADE)
email = models.EmailField(null=True, blank=True)
token = models.CharField(default=generate_invite_token, max_length=64, null=True, blank=True)
def __str__(self) -> str:
return _("Invite to team '{team}' for '{email}'").format(
team=str(self.team), email=self.email
)
class TeamAPIToken(models.Model):
"""
A TeamAPIToken represents an API token that has the same access level as the team it belongs to.
:param team: The team the person is invited to
:type team: Team
:param name: A human-readable name for the token
:type name: str
:param active: Whether or not this token is active
:type active: bool
:param token: The secret required to submit to the API
:type token: str
"""
team = models.ForeignKey(Team, related_name="tokens", on_delete=models.CASCADE)
name = models.CharField(max_length=190)
active = models.BooleanField(default=True)
token = models.CharField(default=generate_api_token, max_length=64)
def get_event_permission_set(self, organizer, event) -> set:
"""
Gets a set of permissions (as strings) that a token holds for a particular event
:param organizer: The organizer of the event
:param event: The event to check
:return: set of permissions
"""
has_event_access = (self.team.all_events and organizer == self.team.organizer) or (
event in self.team.limit_events.all()
)
return self.team.event_permission_set() if has_event_access else set()
def get_organizer_permission_set(self, organizer) -> set:
"""
Gets a set of permissions (as strings) that a token holds for a particular organizer
:param organizer: The organizer of the event
:return: set of permissions
"""
return self.team.organizer_permission_set() if self.team.organizer == organizer else set()
def has_event_permission(self, organizer, event, perm_name=None, request=None) -> bool:
"""
Checks if this token is part of a team that grants access of type ``perm_name``
to the event ``event``.
:param organizer: The organizer of the event
:param event: The event to check
:param perm_name: The permission, e.g. ``event.orders:read``
:param request: This parameter is ignored and only defined for compatibility reasons.
:return: bool
"""
has_event_access = (self.team.all_events and organizer == self.team.organizer) or (
event in self.team.limit_events.all()
)
if isinstance(perm_name, (tuple, list)):
return has_event_access and any(self.team.has_event_permission(p) for p in perm_name)
return has_event_access and (not perm_name or self.team.has_event_permission(perm_name))
def has_organizer_permission(self, organizer, perm_name=None, request=None):
"""
Checks if this token is part of a team that grants access of type ``perm_name``
to the organizer ``organizer``.
:param organizer: The organizer to check
:param perm_name: The permission, e.g. ``organizer.events:create``
:param request: This parameter is ignored and only defined for compatibility reasons.
:return: bool
"""
if isinstance(perm_name, (tuple, list)):
return organizer == self.team.organizer and any(self.team.has_organizer_permission(p) for p in perm_name)
return organizer == self.team.organizer and (not perm_name or self.team.has_organizer_permission(perm_name))
def get_events_with_any_permission(self):
"""
Returns a queryset of events the token has any permissions to.
:return: Iterable of Events
"""
if self.team.all_events:
return self.team.organizer.events.all()
else:
return self.team.limit_events.all()
def get_events_with_permission(self, permission, request=None):
"""
Returns a queryset of events the token has a specific permissions to.
:param request: Ignored, for compatibility with User model
:return: Iterable of Events
"""
from pretix.base.permissions import AnyPermissionOf
if (
isinstance(permission, (AnyPermissionOf, list, tuple)) and any(self.team.has_event_permission(p) for p in permission)
) or (isinstance(permission, str) and self.team.has_event_permission(permission)):
return self.get_events_with_any_permission()
else:
return self.team.organizer.events.none()
class OrganizerFooterLink(models.Model):
"""
A footer link assigned to an organizer.
"""
organizer = models.ForeignKey('Organizer', on_delete=models.CASCADE, related_name='footer_links')
label = I18nCharField(
max_length=200,
verbose_name=_("Link text"),
)
url = models.URLField(
verbose_name=_("Link URL"),
)
def delete(self, *args, **kwargs):
super().delete(*args, **kwargs)
self.organizer.cache.clear()
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
self.organizer.cache.clear()
class SalesChannel(LoggedModel):
organizer = models.ForeignKey('Organizer', on_delete=models.CASCADE, related_name='sales_channels')
label = I18nCharField(
max_length=200,
verbose_name=_("Name"),
)
identifier = models.CharField(
verbose_name=_("Identifier"),
max_length=200,
validators=[
RegexValidator(
regex=r"^[a-zA-Z0-9.\-_]+$",
message=_("The identifier may only contain letters, numbers, dots, dashes, and underscores."),
),
],
)
type = models.CharField(
verbose_name=_("Type"),
max_length=200,
)
position = models.PositiveIntegerField(
default=0,
verbose_name=_("Position")
)
configuration = models.JSONField(default=dict)
objects = ScopedManager(organizer="organizer")
class Meta:
ordering = ("position", "type", "identifier", "id")
unique_together = ("organizer", "identifier")
def __str__(self):
return str(self.label)
@cached_property
def type_instance(self):
from ..channels import get_all_sales_channel_types
types = get_all_sales_channel_types()
return types[self.type]
@property
def icon(self):
return self.type_instance.icon
def allow_delete(self):
from . import Order
if self.type_instance.default_created:
return False
return not Order.objects.filter(sales_channel=self).exists()