Exporters: Give access to authentication infos and allow empty permissions (#5979)

* Exporters: Give access to authentication infos

* Allow exporters to have empty permission

* Use a protocol
This commit is contained in:
Raphael Michel
2026-04-02 15:44:36 +02:00
committed by GitHub
parent 84e12fea32
commit d411c36414
5 changed files with 50 additions and 8 deletions

View File

@@ -47,6 +47,7 @@ from django.utils.formats import localize
from django.utils.translation import gettext, gettext_lazy as _ from django.utils.translation import gettext, gettext_lazy as _
from pretix.base.models import Event from pretix.base.models import Event
from pretix.base.models.auth import PermissionHolder
from pretix.helpers.safe_openpyxl import ( # NOQA: backwards compatibility for plugins using excel_safe from pretix.helpers.safe_openpyxl import ( # NOQA: backwards compatibility for plugins using excel_safe
SafeWorkbook, remove_invalid_excel_chars as excel_safe, SafeWorkbook, remove_invalid_excel_chars as excel_safe,
) )
@@ -59,11 +60,20 @@ class BaseExporter:
This is the base class for all data exporters This is the base class for all data exporters
""" """
def __init__(self, event, organizer, progress_callback=lambda v: None): def __init__(self, event, organizer, permission_holder: PermissionHolder=None, progress_callback=lambda v: None):
"""
:param event: Event context, can also be a queryset of events for multi-event exports
:param organizer: Organizer context
:param user: The user who triggered the export (or None).
:param token: The API token that triggered the export (or None).
:param device: The device that triggered the export (or None)
:param progress_callback: Callback function with progress
"""
self.event = event self.event = event
self.organizer = organizer self.organizer = organizer
self.progress_callback = progress_callback self.progress_callback = progress_callback
self.is_multievent = isinstance(event, QuerySet) self.is_multievent = isinstance(event, QuerySet)
self.permission_holder = permission_holder
if isinstance(event, QuerySet): if isinstance(event, QuerySet):
self.events = event self.events = event
self.event = None self.event = None
@@ -180,7 +190,7 @@ class BaseExporter:
return True return True
@classmethod @classmethod
def get_required_event_permission(cls) -> str: def get_required_event_permission(cls) -> Optional[str]:
""" """
The permission level required to use this exporter for events. For multi-event-exports, this will be used The permission level required to use this exporter for events. For multi-event-exports, this will be used
to limit the selection of events. Will be ignored if the ``OrganizerLevelExportMixin`` mixin is used. to limit the selection of events. Will be ignored if the ``OrganizerLevelExportMixin`` mixin is used.
@@ -195,7 +205,7 @@ class OrganizerLevelExportMixin:
raise TypeError("required_event_permission may not be called on OrganizerLevelExportMixin") raise TypeError("required_event_permission may not be called on OrganizerLevelExportMixin")
@classmethod @classmethod
def get_required_organizer_permission(cls) -> str: def get_required_organizer_permission(cls) -> Optional[str]:
""" """
The permission level required to use this exporter. Must be set for organizer-level exports. Set to `None` to The permission level required to use this exporter. Must be set for organizer-level exports. Set to `None` to
allow everyone with any access to the organizer. allow everyone with any access to the organizer.

View File

@@ -38,6 +38,7 @@ import operator
import secrets import secrets
from datetime import timedelta from datetime import timedelta
from functools import reduce from functools import reduce
from typing import Protocol
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import ( from django.contrib.auth.models import (
@@ -67,6 +68,14 @@ class EmailAddressTakenError(IntegrityError):
pass pass
class PermissionHolder(Protocol):
def has_event_permission(self, organizer, event, perm_name=None, request=None, session_key=None) -> bool:
...
def has_organizer_permission(self, organizer, perm_name=None, request=None):
...
class UserManager(BaseUserManager): class UserManager(BaseUserManager):
""" """
This is the user manager for our custom user model. See the User This is the user manager for our custom user model. See the User

View File

@@ -229,7 +229,7 @@ class Device(LoggedModel):
""" """
return self._organizer_permission_set() if self.organizer == organizer else set() return self._organizer_permission_set() if self.organizer == organizer else set()
def has_event_permission(self, organizer, event, perm_name=None, request=None) -> bool: def has_event_permission(self, organizer, event, perm_name=None, request=None, session_key=None) -> bool:
""" """
Checks if this token is part of a team that grants access of type ``perm_name`` Checks if this token is part of a team that grants access of type ``perm_name``
to the event ``event``. to the event ``event``.
@@ -238,6 +238,7 @@ class Device(LoggedModel):
:param event: The event to check :param event: The event to check
:param perm_name: The permission, e.g. ``event.orders:read`` :param perm_name: The permission, e.g. ``event.orders:read``
:param request: This parameter is ignored and only defined for compatibility reasons. :param request: This parameter is ignored and only defined for compatibility reasons.
:param session_key: This parameter is ignored and only defined for compatibility reasons.
:return: bool :return: bool
""" """
has_event_access = (self.all_events and organizer == self.organizer) or ( has_event_access = (self.all_events and organizer == self.organizer) or (

View File

@@ -319,6 +319,9 @@ class TeamQuerySet(models.QuerySet):
def event_permission_q(cls, perm_name): def event_permission_q(cls, perm_name):
from ..permissions import assert_valid_event_permission from ..permissions import assert_valid_event_permission
if perm_name is None:
return Q()
if perm_name.startswith('can_') and perm_name in OLD_TO_NEW_EVENT_COMPAT: # legacy if perm_name.startswith('can_') and perm_name in OLD_TO_NEW_EVENT_COMPAT: # legacy
return reduce(operator.and_, [cls.event_permission_q(p) for p in OLD_TO_NEW_EVENT_COMPAT[perm_name]]) return reduce(operator.and_, [cls.event_permission_q(p) for p in OLD_TO_NEW_EVENT_COMPAT[perm_name]])
assert_valid_event_permission(perm_name, allow_legacy=False) assert_valid_event_permission(perm_name, allow_legacy=False)
@@ -331,6 +334,9 @@ class TeamQuerySet(models.QuerySet):
def organizer_permission_q(cls, perm_name): def organizer_permission_q(cls, perm_name):
from ..permissions import assert_valid_organizer_permission from ..permissions import assert_valid_organizer_permission
if perm_name is None:
return Q()
if perm_name.startswith('can_') and perm_name in OLD_TO_NEW_ORGANIZER_COMPAT: # legacy if perm_name.startswith('can_') and perm_name in OLD_TO_NEW_ORGANIZER_COMPAT: # legacy
return reduce(operator.and_, [cls.organizer_permission_q(p) for p in OLD_TO_NEW_ORGANIZER_COMPAT[perm_name]]) return reduce(operator.and_, [cls.organizer_permission_q(p) for p in OLD_TO_NEW_ORGANIZER_COMPAT[perm_name]])
assert_valid_organizer_permission(perm_name, allow_legacy=False) assert_valid_organizer_permission(perm_name, allow_legacy=False)
@@ -550,7 +556,7 @@ class TeamAPIToken(models.Model):
""" """
return self.team.organizer_permission_set() if self.team.organizer == organizer else set() return self.team.organizer_permission_set() if self.team.organizer == organizer else set()
def has_event_permission(self, organizer, event, perm_name=None, request=None) -> bool: def has_event_permission(self, organizer, event, perm_name=None, request=None, session_key=None) -> bool:
""" """
Checks if this token is part of a team that grants access of type ``perm_name`` Checks if this token is part of a team that grants access of type ``perm_name``
to the event ``event``. to the event ``event``.
@@ -559,6 +565,7 @@ class TeamAPIToken(models.Model):
:param event: The event to check :param event: The event to check
:param perm_name: The permission, e.g. ``event.orders:read`` :param perm_name: The permission, e.g. ``event.orders:read``
:param request: This parameter is ignored and only defined for compatibility reasons. :param request: This parameter is ignored and only defined for compatibility reasons.
:param session_key: This parameter is ignored and only defined for compatibility reasons.
:return: bool :return: bool
""" """
has_event_access = (self.team.all_events and organizer == self.team.organizer) or ( has_event_access = (self.team.all_events and organizer == self.team.organizer) or (

View File

@@ -211,7 +211,12 @@ def init_event_exporters(event, user=None, token=None, device=None, request=None
if not perm_holder.has_event_permission(event.organizer, event, permission_name, request) and not staff_session: if not perm_holder.has_event_permission(event.organizer, event, permission_name, request) and not staff_session:
continue continue
exporter: BaseExporter = response(event=event, organizer=event.organizer, **kwargs) exporter: BaseExporter = response(
event=event,
organizer=event.organizer,
permission_holder=token or device or user,
**kwargs
)
if not exporter.available_for_user(user if user and user.is_authenticated else None): if not exporter.available_for_user(user if user and user.is_authenticated else None):
continue continue
@@ -243,7 +248,12 @@ def init_organizer_exporters(
continue continue
if issubclass(response, OrganizerLevelExportMixin): if issubclass(response, OrganizerLevelExportMixin):
exporter: BaseExporter = response(event=Event.objects.none(), organizer=organizer, **kwargs) exporter: BaseExporter = response(
event=Event.objects.none(),
organizer=organizer,
permission_holder=token or device or user,
**kwargs,
)
try: try:
if not perm_holder.has_organizer_permission(organizer, response.get_required_organizer_permission(), request) and not staff_session: if not perm_holder.has_organizer_permission(organizer, response.get_required_organizer_permission(), request) and not staff_session:
@@ -295,7 +305,12 @@ def init_organizer_exporters(
if not _has_permission_on_any_team_cache[permission_name] and not staff_session: if not _has_permission_on_any_team_cache[permission_name] and not staff_session:
continue continue
exporter: BaseExporter = response(event=_event_list_cache[permission_name], organizer=organizer, **kwargs) exporter: BaseExporter = response(
event=_event_list_cache[permission_name],
organizer=organizer,
permission_holder=token or device or user,
**kwargs,
)
if not exporter.available_for_user(user if user and user.is_authenticated else None): if not exporter.available_for_user(user if user and user.is_authenticated else None):
continue continue